This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b44a91ea6 [GOBBLIN-2088] Add retry to OH replication final catalog 
commit (#3976)
b44a91ea6 is described below

commit b44a91ea69dded11fc2229237026a575ec94603a
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Jul 11 02:37:37 2024 +0530

    [GOBBLIN-2088] Add retry to OH replication final catalog commit (#3976)
    
    * added retries to OH replication final catalog commit
    * modified code to use retryerfactory with config
---
 .../copy/iceberg/IcebergRegisterStep.java          | 72 +++++++++++++++++-
 .../copy/iceberg/IcebergRegisterStepTest.java      | 85 ++++++++++++++++++++--
 2 files changed, 147 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index ce7d11962..a2e9c80ee 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -18,16 +18,33 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
 import java.util.Properties;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.TableIdentifier;
 
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.RetryException;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.annotations.VisibleForTesting;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
 
 
 /**
@@ -49,6 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
   private final TableMetadata readTimeSrcTableMetadata;
   private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
+  public static final String RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".catalog.registration.retries";
+
+  private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
+      RETRY_TIMES, 5,
+      RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
 
   public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier 
destTableId,
       TableMetadata readTimeSrcTableMetadata, TableMetadata 
justPriorDestTableMetadata,
@@ -85,11 +108,33 @@ public class IcebergRegisterStep implements CommitStep {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
+      Retryer<Void> registerRetryer = createRegisterRetryer();
+      registerRetryer.call(() -> {
+        destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
+        return null;
+      });
     } catch (IcebergTable.TableNotFoundException tnfe) {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
+    } catch (ExecutionException executionException) {
+      String msg = String.format("Failed to register iceberg table : (src: 
{%s}) - (dest: {%s})",
+                                  this.srcTableIdStr,
+                                  this.destTableIdStr);
+      log.error(msg, executionException);
+      throw new RuntimeException(msg, executionException.getCause());
+    } catch (RetryException retryException) {
+      String interruptedNote = Thread.currentThread().isInterrupted() ? "... 
then interrupted" : "";
+      String msg = String.format("Failed to register iceberg table : (src: 
{%s}) - (dest: {%s}) : (retried %d times) %s ",
+                                  this.srcTableIdStr,
+                                  this.destTableIdStr,
+                                  retryException.getNumberOfFailedAttempts(),
+                                  interruptedNote);
+      Throwable informativeException = 
retryException.getLastFailedAttempt().hasException()
+          ? retryException.getLastFailedAttempt().getExceptionCause()
+          : retryException;
+      log.error(msg, informativeException);
+      throw new RuntimeException(msg, informativeException);
     }
   }
 
@@ -103,4 +148,25 @@ public class IcebergRegisterStep implements CommitStep {
   protected IcebergCatalog createDestinationCatalog() throws IOException {
     return IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION);
   }
+
+  private Retryer<Void> createRegisterRetryer() {
+    Config config = ConfigFactory.parseProperties(this.properties);
+    Config retryerOverridesConfig = 
config.hasPath(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
+        ? config.getConfig(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
+        : ConfigFactory.empty();
+
+    return 
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
+      @Override
+      public <V> void onRetry(Attempt<V> attempt) {
+        if (attempt.hasException()) {
+          String msg = String.format("Exception caught while registering 
iceberg table : (src: {%s}) - (dest: {%s}) : [attempt: %d; %s after start]",
+                                      srcTableIdStr,
+                                      destTableIdStr,
+                                      attempt.getAttemptNumber(),
+                                      
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+          log.warn(msg, attempt.getExceptionCause());
+        }
+      }
+    }));
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
index 5e5f2eaa6..48f5a329a 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
@@ -29,6 +29,8 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
 
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+
 
 /** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep} */
 public class IcebergRegisterStepTest {
@@ -41,13 +43,7 @@ public class IcebergRegisterStepTest {
     TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
     TableMetadata readTimeSrcTableMetadata = 
Mockito.mock(TableMetadata.class); // (no mocked behavior)
     IcebergTable mockTable = mockIcebergTable("foo", "bar"); // matches!
-    IcebergRegisterStep regStep =
-        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
-          @Override
-          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
-            return mockSingleTableIcebergCatalog(mockTable);
-          }
-        };
+    IcebergRegisterStep regStep = 
createIcebergRegisterStepInstance(readTimeSrcTableMetadata, 
justPriorDestTableMetadata, mockTable, new Properties());
     try {
       regStep.execute();
       Mockito.verify(mockTable, Mockito.times(1)).registerIcebergTable(any(), 
any());
@@ -75,6 +71,64 @@ public class IcebergRegisterStepTest {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    // Mocking registerIcebergTable() call to fail for first two attempts and 
then succeed
+    // So total number of invocations to registerIcebergTable() should be 3 
only
+    Mockito.doThrow(new RuntimeException())
+        .doThrow(new RuntimeException())
+        .doNothing()
+        .when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep = 
createIcebergRegisterStepInstance(readTimeSrcTableMetadata, 
justPriorDestTableMetadata, mockTable, new Properties());
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithDefaultRetryerConfig() throws 
IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    // Mocking registerIcebergTable() call to always throw exception
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep = 
createIcebergRegisterStepInstance(readTimeSrcTableMetadata, 
justPriorDestTableMetadata, mockTable, new Properties());
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      // The default number of retries is 5 so register iceberg table should 
fail after retrying for 5 times
+      assertRetryTimes(re, 5);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithOverrideRetryerConfig() throws 
IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    // Mocking registerIcebergTable() call to always throw exception
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    Properties properties = new Properties();
+    String retryCount = "10";
+    // Changing the number of retries to 10
+    properties.setProperty(IcebergRegisterStep.RETRYER_CONFIG_PREFIX + "." + 
RETRY_TIMES, retryCount);
+    IcebergRegisterStep regStep = 
createIcebergRegisterStepInstance(readTimeSrcTableMetadata, 
justPriorDestTableMetadata, mockTable, properties);
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      // register iceberg table should fail after retrying for retryCount 
times mentioned above
+      assertRetryTimes(re, Integer.parseInt(retryCount));
+    }
+  }
+
   protected TableMetadata mockTableMetadata(String uuid, String 
metadataFileLocation) throws IOException {
     TableMetadata mockMetadata = Mockito.mock(TableMetadata.class);
     Mockito.when(mockMetadata.uuid()).thenReturn(uuid);
@@ -100,4 +154,21 @@ public class IcebergRegisterStepTest {
     Mockito.when(catalog.openTable(any())).thenReturn(mockTable);
     return catalog;
   }
+
+  private IcebergRegisterStep createIcebergRegisterStepInstance(TableMetadata 
readTimeSrcTableMetadata,
+                                                                TableMetadata 
justPriorDestTableMetadata,
+                                                                IcebergTable 
mockTable,
+                                                                Properties 
properties) {
+    return new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, properties) {
+      @Override
+      protected IcebergCatalog createDestinationCatalog() throws IOException {
+        return mockSingleTableIcebergCatalog(mockTable);
+      }
+    };
+  }
+
+  private void assertRetryTimes(RuntimeException re, Integer retryTimes) {
+    String msg = String.format("Failed to register iceberg table : (src: {%s}) 
- (dest: {%s}) : (retried %d times)", srcTableId, destTableId, retryTimes);
+    Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage());
+  }
 }

Reply via email to