This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b44a91ea6 [GOBBLIN-2088] Add retry to OH replication final catalog
commit (#3976)
b44a91ea6 is described below
commit b44a91ea69dded11fc2229237026a575ec94603a
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Jul 11 02:37:37 2024 +0530
[GOBBLIN-2088] Add retry to OH replication final catalog commit (#3976)
* added retries to OH replication final catalog commit
* modified code to use retryerfactory with config
---
.../copy/iceberg/IcebergRegisterStep.java | 72 +++++++++++++++++-
.../copy/iceberg/IcebergRegisterStepTest.java | 85 ++++++++++++++++++++--
2 files changed, 147 insertions(+), 10 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index ce7d11962..a2e9c80ee 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -18,16 +18,33 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
import java.util.Properties;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.RetryException;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.annotations.VisibleForTesting;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
/**
@@ -49,6 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
private final TableMetadata readTimeSrcTableMetadata;
private final TableMetadata justPriorDestTableMetadata;
private final Properties properties;
+ public static final String RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".catalog.registration.retries";
+
+ private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
+ RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
+ RETRY_TIMES, 5,
+ RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier
destTableId,
TableMetadata readTimeSrcTableMetadata, TableMetadata
justPriorDestTableMetadata,
@@ -85,11 +108,33 @@ public class IcebergRegisterStep implements CommitStep {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
}
- destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
+ Retryer<Void> registerRetryer = createRegisterRetryer();
+ registerRetryer.call(() -> {
+ destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
+ return null;
+ });
} catch (IcebergTable.TableNotFoundException tnfe) {
String msg = "Destination table (with TableMetadata) does not exist: " +
tnfe.getMessage();
log.error(msg);
throw new IOException(msg, tnfe);
+ } catch (ExecutionException executionException) {
+ String msg = String.format("Failed to register iceberg table : (src:
{%s}) - (dest: {%s})",
+ this.srcTableIdStr,
+ this.destTableIdStr);
+ log.error(msg, executionException);
+ throw new RuntimeException(msg, executionException.getCause());
+ } catch (RetryException retryException) {
+ String interruptedNote = Thread.currentThread().isInterrupted() ? "...
then interrupted" : "";
+ String msg = String.format("Failed to register iceberg table : (src:
{%s}) - (dest: {%s}) : (retried %d times) %s ",
+ this.srcTableIdStr,
+ this.destTableIdStr,
+ retryException.getNumberOfFailedAttempts(),
+ interruptedNote);
+ Throwable informativeException =
retryException.getLastFailedAttempt().hasException()
+ ? retryException.getLastFailedAttempt().getExceptionCause()
+ : retryException;
+ log.error(msg, informativeException);
+ throw new RuntimeException(msg, informativeException);
}
}
@@ -103,4 +148,25 @@ public class IcebergRegisterStep implements CommitStep {
protected IcebergCatalog createDestinationCatalog() throws IOException {
return IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION);
}
+
+ private Retryer<Void> createRegisterRetryer() {
+ Config config = ConfigFactory.parseProperties(this.properties);
+ Config retryerOverridesConfig =
config.hasPath(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
+ ? config.getConfig(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
+ : ConfigFactory.empty();
+
+ return
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("Exception caught while registering
iceberg table : (src: {%s}) - (dest: {%s}) : [attempt: %d; %s after start]",
+ srcTableIdStr,
+ destTableIdStr,
+ attempt.getAttemptNumber(),
+
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ }));
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
index 5e5f2eaa6..48f5a329a 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
@@ -29,6 +29,8 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+
/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep} */
public class IcebergRegisterStepTest {
@@ -41,13 +43,7 @@ public class IcebergRegisterStepTest {
TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
TableMetadata readTimeSrcTableMetadata =
Mockito.mock(TableMetadata.class); // (no mocked behavior)
IcebergTable mockTable = mockIcebergTable("foo", "bar"); // matches!
- IcebergRegisterStep regStep =
- new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
- @Override
- protected IcebergCatalog createDestinationCatalog() throws
IOException {
- return mockSingleTableIcebergCatalog(mockTable);
- }
- };
+ IcebergRegisterStep regStep =
createIcebergRegisterStepInstance(readTimeSrcTableMetadata,
justPriorDestTableMetadata, mockTable, new Properties());
try {
regStep.execute();
Mockito.verify(mockTable, Mockito.times(1)).registerIcebergTable(any(),
any());
@@ -75,6 +71,64 @@ public class IcebergRegisterStepTest {
}
}
+ @Test
+ public void testRegisterIcebergTableWithRetryer() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ // Mocking registerIcebergTable() call to fail for first two attempts and
then succeed
+ // So total number of invocations to registerIcebergTable() should be 3
only
+ Mockito.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doNothing()
+ .when(mockTable).registerIcebergTable(any(), any());
+ IcebergRegisterStep regStep =
createIcebergRegisterStepInstance(readTimeSrcTableMetadata,
justPriorDestTableMetadata, mockTable, new Properties());
+ try {
+ regStep.execute();
+ Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(),
any());
+ } catch (RuntimeException re) {
+ Assert.fail("Got Unexpected Runtime Exception", re);
+ }
+ }
+
+ @Test
+ public void testRegisterIcebergTableWithDefaultRetryerConfig() throws
IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ // Mocking registerIcebergTable() call to always throw exception
+ Mockito.doThrow(new
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+ IcebergRegisterStep regStep =
createIcebergRegisterStepInstance(readTimeSrcTableMetadata,
justPriorDestTableMetadata, mockTable, new Properties());
+ try {
+ regStep.execute();
+ Assert.fail("Expected Runtime Exception");
+ } catch (RuntimeException re) {
+ // The default number of retries is 5 so register iceberg table should
fail after retrying for 5 times
+ assertRetryTimes(re, 5);
+ }
+ }
+
+ @Test
+ public void testRegisterIcebergTableWithOverrideRetryerConfig() throws
IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ // Mocking registerIcebergTable() call to always throw exception
+ Mockito.doThrow(new
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+ Properties properties = new Properties();
+ String retryCount = "10";
+ // Changing the number of retries to 10
+ properties.setProperty(IcebergRegisterStep.RETRYER_CONFIG_PREFIX + "." +
RETRY_TIMES, retryCount);
+ IcebergRegisterStep regStep =
createIcebergRegisterStepInstance(readTimeSrcTableMetadata,
justPriorDestTableMetadata, mockTable, properties);
+ try {
+ regStep.execute();
+ Assert.fail("Expected Runtime Exception");
+ } catch (RuntimeException re) {
+ // register iceberg table should fail after retrying for retryCount
times mentioned above
+ assertRetryTimes(re, Integer.parseInt(retryCount));
+ }
+ }
+
protected TableMetadata mockTableMetadata(String uuid, String
metadataFileLocation) throws IOException {
TableMetadata mockMetadata = Mockito.mock(TableMetadata.class);
Mockito.when(mockMetadata.uuid()).thenReturn(uuid);
@@ -100,4 +154,21 @@ public class IcebergRegisterStepTest {
Mockito.when(catalog.openTable(any())).thenReturn(mockTable);
return catalog;
}
+
+ private IcebergRegisterStep createIcebergRegisterStepInstance(TableMetadata
readTimeSrcTableMetadata,
+ TableMetadata
justPriorDestTableMetadata,
+ IcebergTable
mockTable,
+ Properties
properties) {
+ return new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, properties) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ }
+
+ private void assertRetryTimes(RuntimeException re, Integer retryTimes) {
+ String msg = String.format("Failed to register iceberg table : (src: {%s})
- (dest: {%s}) : (retried %d times)", srcTableId, destTableId, retryTimes);
+ Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage());
+ }
}