[
https://issues.apache.org/jira/browse/GOBBLIN-2088?focusedWorklogId=924005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-924005
]
ASF GitHub Bot logged work on GOBBLIN-2088:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jun/24 14:28
Start Date: 21/Jun/24 14:28
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3976:
URL: https://github.com/apache/gobblin/pull/3976#discussion_r1648405152
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
private final TableMetadata readTimeSrcTableMetadata;
private final TableMetadata justPriorDestTableMetadata;
private final Properties properties;
- private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+ public static final String ICEBERG_REGISTER_STEP_PREFIX =
"icebergRegisterStep";
Review Comment:
"fully qualify" this by prefixing w/
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
then name it more semantically, like:
```
RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
"catalog.registration.retries";
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws
IOException {
}
}
+ @Test
+ public void testRegisterIcebergTableWithRetryer() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new RuntimeException()).doThrow(new
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(),
any());
Review Comment:
nit: for clarity, put each of the `.doX` behaviors on its own line. also
add a comment above to the effect of "fail the first two times, then succeed"
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws
IOException {
}
}
+ @Test
+ public void testRegisterIcebergTableWithRetryer() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new RuntimeException()).doThrow(new
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(),
any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(),
any());
+ } catch (RuntimeException re) {
+ Assert.fail("Got Unexpected Runtime Exception", re);
+ }
+ }
+
+ @Test
+ public void testRegisterIcebergTableWithRetryerThrowsRuntimeException()
throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Assert.fail("Expected Runtime Exception");
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg
table (retried"), re.getMessage());
Review Comment:
let's combine this test w/ that of passing overriding retryer config. e.g.
specify that it should retry N times and then look for that number in the
message text
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
}
- Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
- .retryIfException()
-
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+ Retryer<Void> registerRetryer =
createRegisterRetryer(ConfigFactory.load());
Review Comment:
rather than `.load()` turn `this.properties` into a `Config`.
see the `IcebergDatasetFinder`, that `CopySource`
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
those are the original properties of the `SourceState` that the enclosing
gobblin job is launched with.
they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset`
that [instantiates this
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
String msg = "Destination table (with TableMetadata) does not exist: " +
tnfe.getMessage();
log.error(msg);
throw new IOException(msg, tnfe);
- } catch (ExecutionException | RetryException e) {
- log.error("Exception Encountered while Registering Iceberg Table", e);
- throw new RuntimeException(e);
+ } catch (ExecutionException executionException) {
+ String msg = "Failed to register iceberg table";
+ log.error(msg, executionException);
Review Comment:
given this is a heavily concurrent system and we regularly analyze log msgs
perhaps across multiple runs, every log message must carry context. for that
reason, this class already has the members `srcTableIdStr` and
`destTableIdStr`. follow how the latter is used on L103
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
String msg = "Destination table (with TableMetadata) does not exist: " +
tnfe.getMessage();
log.error(msg);
throw new IOException(msg, tnfe);
- } catch (ExecutionException | RetryException e) {
- log.error("Exception Encountered while Registering Iceberg Table", e);
- throw new RuntimeException(e);
+ } catch (ExecutionException executionException) {
+ String msg = "Failed to register iceberg table";
+ log.error(msg, executionException);
+ throw new RuntimeException(msg, executionException.getCause());
+ } catch (RetryException retryException) {
+ String interruptedNote = Thread.currentThread().isInterrupted() ? "...
then interrupted" : "";
+ String msg = String.format("Failed to register iceberg table (retried %d
times %s)",
Review Comment:
same here about table context
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
private final TableMetadata readTimeSrcTableMetadata;
private final TableMetadata justPriorDestTableMetadata;
private final Properties properties;
- private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+ public static final String ICEBERG_REGISTER_STEP_PREFIX =
"icebergRegisterStep";
Review Comment:
"fully qualify" this by prefixing w/
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
then name it more semantically, like:
```
RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".catalog.registration.retries";
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
}
- Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
- .retryIfException()
-
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+ Retryer<Void> registerRetryer =
createRegisterRetryer(ConfigFactory.load());
Review Comment:
rather than `.load()` turn `this.properties` into a `Config`.
see the `IcebergDatasetFinder` that `CopySource`
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
those are the original properties of the `SourceState` that the enclosing
gobblin job is launched with.
they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset`
that [instantiates this
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws
IOException {
}
}
+ @Test
+ public void testRegisterIcebergTableWithRetryer() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new RuntimeException()).doThrow(new
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(),
any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(),
any());
+ } catch (RuntimeException re) {
+ Assert.fail("Got Unexpected Runtime Exception", re);
+ }
+ }
+
+ @Test
+ public void testRegisterIcebergTableWithRetryerThrowsRuntimeException()
throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Assert.fail("Expected Runtime Exception");
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg
table (retried"), re.getMessage());
Review Comment:
let's combine this test and perform w/ and w/o passing overriding retryer
config. e.g. specify that it should retry N times and then look for that
number in the message text
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws
IOException {
}
}
+ @Test
+ public void testRegisterIcebergTableWithRetryer() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new RuntimeException()).doThrow(new
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(),
any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(),
any());
+ } catch (RuntimeException re) {
+ Assert.fail("Got Unexpected Runtime Exception", re);
+ }
+ }
+
+ @Test
+ public void testRegisterIcebergTableWithRetryerThrowsRuntimeException()
throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+ IcebergTable mockTable = mockIcebergTable("foo", "bar");
+ Mockito.doThrow(new
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Assert.fail("Expected Runtime Exception");
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg
table (retried"), re.getMessage());
Review Comment:
let's combine this test and perform w/ and w/o passing overriding retryer
config. e.g. specify that it should retry N times and then look for that
number in the exception message text
Issue Time Tracking
-------------------
Worklog Id: (was: 924005)
Time Spent: 1h 10m (was: 1h)
> Add retries to OH replication final catalog commit
> --------------------------------------------------
>
> Key: GOBBLIN-2088
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2088
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)