[ 
https://issues.apache.org/jira/browse/BEAM-3848?focusedWorklogId=89354&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89354
 ]

ASF GitHub Bot logged work on BEAM-3848:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Apr/18 12:03
            Start Date: 10/Apr/18 12:03
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #4905: [BEAM-3848] Enables 
ability to retry Solr writes on error (SolrIO)
URL: https://github.com/apache/beam/pull/4905
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/solr/pom.xml b/sdks/java/io/solr/pom.xml
index 53e4d1fa17c..a0d93a26e41 100644
--- a/sdks/java/io/solr/pom.xml
+++ b/sdks/java/io/solr/pom.xml
@@ -55,6 +55,16 @@
             <artifactId>commons-compress</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
+
         <!-- compile dependencies -->
         <dependency>
             <groupId>com.google.auto.value</groupId>
@@ -101,6 +111,13 @@
             <classifier>tests</classifier>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-runners-direct-java</artifactId>
@@ -129,24 +146,23 @@
         </dependency>
 
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.3.2</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>com.carrotsearch.randomizedtesting</groupId>
-            <artifactId>randomizedtesting-runner</artifactId>
+            <artifactId>junit4-ant</artifactId>
             <version>2.3.2</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
+            <artifactId>slf4j-jdk14</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
-
 </project>
diff --git 
a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java 
b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
index 0384417c6f9..b80abf96c8d 100644
--- a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
@@ -22,6 +22,7 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
@@ -33,10 +34,12 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -46,6 +49,10 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -62,6 +69,7 @@
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.schema.SchemaResponse;
 import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -72,6 +80,9 @@
 import org.apache.solr.common.params.CursorMarkParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Transforms for reading and writing data from/to Solr.
@@ -110,10 +121,15 @@
  * 
inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn));
  *
  * }</pre>
+ *
+ * <p>When writing it is possible to customise the retry behavior should an 
error be encountered. By
+ * default this is disabled and only one attempt will be made to write to SOLR.
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class SolrIO {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SolrIO.class);
+
   public static Read read() {
     // 1000 for batch size is good enough in many cases,
     // ex: if document size is large, around 10KB, the response's size will be 
around 10MB
@@ -124,7 +140,7 @@ public static Read read() {
   public static Write write() {
     // 1000 for batch size is good enough in many cases,
     // ex: if document size is large, around 10KB, the request's size will be 
around 10MB
-    // if document seize is small, around 1KB, the request's size will be 
around 1MB
+    // if document size is small, around 1KB, the request's size will be 
around 1MB
     return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();
   }
 
@@ -200,6 +216,94 @@ private HttpClient createHttpClient() {
     }
   }
 
+  /**
+   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to Solr. A retry
+   * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
+   * any of the following exceptions:
+   *
+   * <ul>
+   *   <li>{@link IOException}
+   *   <li>{@link SolrServerException}
+   *   <li>{@link SolrException} where the {@link SolrException.ErrorCode} is 
one of:
+   *       <ul>
+   *         <li>{@link SolrException.ErrorCode#CONFLICT}
+   *         <li>{@link SolrException.ErrorCode#SERVER_ERROR}
+   *         <li>{@link SolrException.ErrorCode#SERVICE_UNAVAILABLE}
+   *         <li>{@link SolrException.ErrorCode#INVALID_STATE}
+   *         <li>{@link SolrException.ErrorCode#UNKNOWN}
+   *       </ul>
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class RetryConfiguration implements Serializable {
+    @VisibleForTesting
+    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
+
+    abstract int getMaxAttempts();
+
+    abstract Duration getMaxDuration();
+
+    abstract RetryPredicate getRetryPredicate();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract SolrIO.RetryConfiguration.Builder setMaxAttempts(int 
maxAttempts);
+
+      abstract SolrIO.RetryConfiguration.Builder setMaxDuration(Duration 
maxDuration);
+
+      abstract SolrIO.RetryConfiguration.Builder 
setRetryPredicate(RetryPredicate retryPredicate);
+
+      abstract SolrIO.RetryConfiguration build();
+    }
+
+    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
+      checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
+      checkArgument(
+              maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+              "maxDuration must be greater than 0");
+      return new AutoValue_SolrIO_RetryConfiguration.Builder()
+          .setMaxAttempts(maxAttempts)
+          .setMaxDuration(maxDuration)
+          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+          .build();
+    }
+
+    // Exposed only to allow tests to easily simulate server errors
+    @VisibleForTesting
+    RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
+      checkArgument(predicate != null, "predicate must be provided");
+      return builder().setRetryPredicate(predicate).build();
+    }
+
+    /**
+     * An interface used to control if we retry the Solr call when a {@link 
Throwable} occurs. If
+     * {@link RetryPredicate#test(Object)} returns true, {@link Write} tries 
to resend the
+     * requests to the Solr server if the {@link RetryConfiguration} permits 
it.
+     */
+    @FunctionalInterface
+    interface RetryPredicate extends Predicate<Throwable>, Serializable {}
+
+    /** This is the default predicate used to test if a failed Solr operation 
should be retried. */
+    private static class DefaultRetryPredicate implements RetryPredicate {
+      private static final Set<Integer> ELIGIBLE_CODES =
+          ImmutableSet.of(
+              SolrException.ErrorCode.CONFLICT.code,
+              SolrException.ErrorCode.SERVER_ERROR.code,
+              SolrException.ErrorCode.SERVICE_UNAVAILABLE.code,
+              SolrException.ErrorCode.INVALID_STATE.code,
+              SolrException.ErrorCode.UNKNOWN.code);
+
+      @Override
+      public boolean test(Throwable t) {
+        return (t instanceof IOException
+            || t instanceof SolrServerException
+            || (t instanceof SolrException && 
ELIGIBLE_CODES.contains(((SolrException) t).code())));
+      }
+    }
+  }
+
   /** A {@link PTransform} reading data from Solr. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<SolrDocument>> {
@@ -567,10 +671,10 @@ public void close() throws IOException {
     }
   }
 
+
   /** A {@link PTransform} writing data to Solr. */
   @AutoValue
   public abstract static class Write extends 
PTransform<PCollection<SolrInputDocument>, PDone> {
-
     @Nullable
     abstract ConnectionConfiguration getConnectionConfiguration();
 
@@ -581,6 +685,9 @@ public void close() throws IOException {
 
     abstract Builder builder();
 
+    @Nullable
+    abstract RetryConfiguration getRetryConfiguration();
+
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setConnectionConfiguration(ConnectionConfiguration 
connectionConfiguration);
@@ -589,6 +696,8 @@ public void close() throws IOException {
 
       abstract Builder setMaxBatchSize(int maxBatchSize);
 
+      abstract Builder setRetryConfiguration(RetryConfiguration 
retryConfiguration);
+
       abstract Write build();
     }
 
@@ -623,11 +732,37 @@ Write withMaxBatchSize(int batchSize) {
       return builder().setMaxBatchSize(batchSize).build();
     }
 
+    /**
+     * Provides configuration to retry a failed batch call to Solr. A batch is 
considered as failed
+     * if the underlying {@link CloudSolrClient} surfaces {@link
+     * org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException}, 
{@link
+     * SolrServerException} or {@link IOException}. Users should consider that 
retrying might
+     * compound the underlying problem which caused the initial failure. Users 
should also be aware
+     * that once retrying is exhausted the error is surfaced to the runner 
which <em>may</em> then
+     * opt to retry the current partition in entirety or abort if the max 
number of retries of the
+     * runner is completed. Retrying uses an exponential backoff algorithm, 
with minimum backoff of
+     * 5 seconds and then surfacing the error once the maximum number of 
retries or maximum
+     * configuration duration is exceeded.
+     *
+     * <p>Example use:
+     *
+     * <pre>{@code
+     * SolrIO.write()
+     *   .withRetryConfiguration(SolrIO.RetryConfiguration.create(10, 
Duration.standardMinutes(3))
+     *   ...
+     * }</pre>
+     *
+     * @param retryConfiguration the rules which govern the retry behavior
+     * @return the {@link Write} with retrying configured
+     */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
+      return builder().setRetryConfiguration(retryConfiguration).build();
+    }
+
     @Override
     public PDone expand(PCollection<SolrInputDocument> input) {
-      checkState(
-          getConnectionConfiguration() != null,
-          "withConnectionConfiguration() is required");
+      checkState(getConnectionConfiguration() != null, 
"withConnectionConfiguration() is required");
       checkState(getCollection() != null, "to() is required");
 
       input.apply(ParDo.of(new WriteFn(this)));
@@ -636,9 +771,14 @@ public PDone expand(PCollection<SolrInputDocument> input) {
 
     @VisibleForTesting
     static class WriteFn extends DoFn<SolrInputDocument, Void> {
+      @VisibleForTesting
+      static final String RETRY_ATTEMPT_LOG = "Error writing to Solr. Retry 
attempt[%d]";
 
-      private final Write spec;
+      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
+      private static final Duration RETRY_MAX_BACKOFF = 
Duration.standardDays(365);
 
+      private transient FluentBackoff retryBackoff; // defaults to no retrying
+      private final Write spec;
       private transient AuthorizedSolrClient solrClient;
       private Collection<SolrInputDocument> batch;
 
@@ -647,8 +787,23 @@ public PDone expand(PCollection<SolrInputDocument> input) {
       }
 
       @Setup
-      public void createClient() throws Exception {
+      public void setup() throws Exception {
         solrClient = spec.getConnectionConfiguration().createClient();
+
+        retryBackoff =
+            FluentBackoff.DEFAULT
+                .withMaxRetries(0) // default to no retrying
+                .withInitialBackoff(RETRY_INITIAL_BACKOFF)
+                .withMaxCumulativeBackoff(RETRY_MAX_BACKOFF);
+
+        if (spec.getRetryConfiguration() != null) {
+          // FluentBackoff counts retries excluding the original while we 
count attempts
+          // to remove ambiguity (hence the -1)
+          retryBackoff =
+              retryBackoff
+                  
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
+                  
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
+        }
       }
 
       @StartBundle
@@ -670,23 +825,53 @@ public void finishBundle(FinishBundleContext context) 
throws Exception {
         flushBatch();
       }
 
-      private void flushBatch() throws IOException {
+      // Flushes the batch, implementing the retry mechanism as configured in 
the spec.
+      private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
         try {
           UpdateRequest updateRequest = new UpdateRequest();
           updateRequest.add(batch);
-          solrClient.process(spec.getCollection(), updateRequest);
-        } catch (SolrServerException e) {
-          throw new IOException("Error writing to Solr", e);
+
+          Sleeper sleeper = Sleeper.DEFAULT;
+          BackOff backoff = retryBackoff.backoff();
+          int attempt = 0;
+          while (true) {
+            attempt++;
+            try {
+              solrClient.process(spec.getCollection(), updateRequest);
+              break;
+            } catch (Exception exception) {
+
+              // fail immediately if no retry configuration doesn't handle this
+              if (spec.getRetryConfiguration() == null
+                  || 
!spec.getRetryConfiguration().getRetryPredicate().test(exception)) {
+                throw new IOException(
+                        "Error writing to Solr (no attempt made to retry)", 
exception);
+              }
+
+              // see if we can pause and try again
+              if (!BackOffUtils.next(sleeper, backoff)) {
+                throw new IOException(
+                    String.format(
+                        "Error writing to Solr after %d attempt(s). No more 
attempts allowed",
+                        attempt),
+                    exception);
+
+              } else {
+                // Note: this used in test cases to verify behavior
+                LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), exception);
+              }
+            }
+          }
         } finally {
           batch.clear();
         }
       }
 
       @Teardown
-      public void closeClient() throws Exception {
+      public void closeClient() throws IOException {
         if (solrClient != null) {
           solrClient.close();
         }
diff --git 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
index 14d4b2013d6..f954aabece8 100644
--- 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
+++ 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
@@ -17,18 +17,24 @@
  */
 package org.apache.beam.sdk.io.solr;
 
+import static 
org.apache.beam.sdk.io.solr.SolrIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
+import static org.apache.beam.sdk.io.solr.SolrIOTestUtils.namedThreadIsAlive;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 
+import 
com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -38,14 +44,19 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.security.Sha256AuthenticationProvider;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -72,6 +83,9 @@
 
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
+  @Rule
+  public final transient ExpectedLogs expectedLogs = 
ExpectedLogs.none(SolrIO.class);
+
   @BeforeClass
   public static void beforeClass() throws Exception {
     // setup credential for solr user,
@@ -117,6 +131,7 @@ public void before() throws Exception {
 
   @Rule public ExpectedException thrown = ExpectedException.none();
 
+  @Test
   public void testBadCredentials() throws IOException {
     thrown.expect(SolrException.class);
 
@@ -263,4 +278,102 @@ public void testSplit() throws Exception {
     // therefore, can not exist an empty shard.
     assertEquals("Wrong number of empty splits", expectedNumSplits, 
nonEmptySplits);
   }
+
+  /**
+   * Test that retries are invoked when Solr returns error. We invoke this by 
calling a non existing
+   * collection, and use a strategy that will retry on any SolrException. The 
logger is used to
+   * verify expected behavior.
+   */
+  @Test
+  public void testWriteRetry() throws Throwable {
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Error writing to Solr");
+
+    // entry state of the release tracker to ensure we only unregister newly 
created objects
+    Set<Object> entryState = 
ImmutableSet.copyOf(ObjectReleaseTracker.OBJECTS.keySet());
+
+    SolrIO.Write write =
+        SolrIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withRetryConfiguration(
+                SolrIO.RetryConfiguration.create(3, 
Duration.standardMinutes(3))
+                    .withRetryPredicate(new 
SolrIOTestUtils.LenientRetryStrategy()))
+            .to("wrong-collection");
+
+    List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS);
+    pipeline.apply(Create.of(data)).apply(write);
+
+    try {
+      pipeline.run();
+
+    } catch (final Pipeline.PipelineExecutionException e) {
+
+      // Hack: await all worker threads completing (BEAM-3409)
+      int waitAttempts = 30; // defensive coding
+      while (namedThreadIsAlive("direct-runner-worker") && waitAttempts-- >= 
0) {
+        LOG.info("Pausing to allow direct-runner-worker threads to finish");
+        Thread.sleep(1000);
+      }
+
+      // remove solrClients created by us as there are no guarantees on 
Teardown here
+      for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
+        if (o instanceof SolrZkClient && !entryState.contains(o)) {
+          LOG.info("Removing unreleased SolrZkClient");
+          ObjectReleaseTracker.release(o);
+        }
+      }
+
+      // check 2 retries were initiated by inspecting the log before passing 
on the exception
+      
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 
1));
+      
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 
2));
+
+      throw e.getCause();
+    }
+
+    fail("Pipeline should not have run to completion");
+  }
+
+  /**
+   * Tests predicate performs as documented.
+   */
+  @Test
+  public void testDefaultRetryPredicate() {
+    assertTrue(DEFAULT_RETRY_PREDICATE.test(new IOException("test")));
+    assertTrue(DEFAULT_RETRY_PREDICATE.test(new SolrServerException("test")));
+
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(new 
SolrException(SolrException.ErrorCode.CONFLICT, "test")));
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.SERVER_ERROR, "test")));
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, 
"test")));
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.INVALID_STATE, "test")));
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(new 
SolrException(SolrException.ErrorCode.UNKNOWN, "test")));
+    assertTrue(
+        DEFAULT_RETRY_PREDICATE.test(
+            new HttpSolrClient.RemoteSolrException(
+                "localhost",
+                SolrException.ErrorCode.SERVICE_UNAVAILABLE.code,
+                "test",
+                new Exception())));
+
+    assertFalse(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.BAD_REQUEST, "test")));
+    assertFalse(
+        DEFAULT_RETRY_PREDICATE.test(new 
SolrException(SolrException.ErrorCode.FORBIDDEN, "test")));
+    assertFalse(
+        DEFAULT_RETRY_PREDICATE.test(new 
SolrException(SolrException.ErrorCode.NOT_FOUND, "test")));
+    assertFalse(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.UNAUTHORIZED, "test")));
+    assertFalse(
+        DEFAULT_RETRY_PREDICATE.test(
+            new SolrException(SolrException.ErrorCode.UNSUPPORTED_MEDIA_TYPE, 
"test")));
+  }
 }
diff --git 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
index fb99d5539bd..3bddb1b1aaa 100644
--- 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
+++ 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
@@ -20,10 +20,12 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 
 /** Test utilities to use with {@link SolrIO}. */
@@ -129,4 +131,27 @@ static long commitAndGetCurrentNumDocs(String collection, 
AuthorizedSolrClient c
     }
     return data;
   }
+
+  /**
+   * A strategy that will accept to retry on any SolrException.
+   */
+  static class LenientRetryStrategy implements 
SolrIO.RetryConfiguration.RetryPredicate {
+    @Override
+    public boolean test(Throwable throwable) {
+      return throwable instanceof SolrException;
+    }
+  }
+
+  /**
+   * A utility which will return true if at least one thread of the given name 
exists and is alive.
+   */
+  static boolean namedThreadIsAlive(String name) {
+    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+    for (Thread t : threadSet) {
+      if (t.getName().equals(name) && t.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 89354)
    Time Spent: 7.5h  (was: 7h 20m)

> SolrIO: Improve retrying mechanism in client writes
> ---------------------------------------------------
>
>                 Key: BEAM-3848
>                 URL: https://issues.apache.org/jira/browse/BEAM-3848
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-solr
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Tim Robertson
>            Assignee: Tim Robertson
>            Priority: Minor
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> A busy SOLR server is prone to return RemoteSOLRException on writing which 
> currently failsĀ a complete task (e.g. a partition of a spark RDD being 
> written to SOLR).
> A good addition would be the ability to provide a retrying mechanism for the 
> batch in flight, rather than failingĀ fast, which will most likely trigger a 
> much larger retry of more writes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to