phet commented on code in PR #3976:
URL: https://github.com/apache/gobblin/pull/3976#discussion_r1648405152


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
   private final TableMetadata readTimeSrcTableMetadata;
   private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
-  private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+  public static final String ICEBERG_REGISTER_STEP_PREFIX = 
"icebergRegisterStep";

Review Comment:
   "fully qualify" this by prefixing w/ 
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
   
   then name it more semantically, like:
   ```
   RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
"catalog.registration.retries";



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());

Review Comment:
   nit: for clarity, put each of the `.doX` behaviors on its own line.  also 
add a comment above to the effect of "fail the first two times, then succeed"



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test w/ that of passing overriding retryer config.  e.g. 
specify that it should retry N times and then look for that number in the 
message text



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
-          .retryIfException()
-          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+      Retryer<Void> registerRetryer = 
createRegisterRetryer(ConfigFactory.load());

Review Comment:
   rather than `.load()` turn `this.properties` into a `Config`.
   
   see the `IcebergDatasetFinder`, that `CopySource` 
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
 those are the original properties of the `SourceState` that the enclosing 
gobblin job is launched with.
   
   they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset` 
that [instantiates this 
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
-    } catch (ExecutionException | RetryException e) {
-      log.error("Exception Encountered while Registering Iceberg Table", e);
-      throw new RuntimeException(e);
+    } catch (ExecutionException executionException) {
+      String msg = "Failed to register iceberg table";
+      log.error(msg, executionException);

Review Comment:
   given this is a heavily concurrent system and we regularly analyze log msgs 
perhaps across multiple runs, every log message must carry context.  for that 
reason, this class already has the members `srcTableIdStr` and 
`destTableIdStr`.  follow how the latter is used on L103



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
-    } catch (ExecutionException | RetryException e) {
-      log.error("Exception Encountered while Registering Iceberg Table", e);
-      throw new RuntimeException(e);
+    } catch (ExecutionException executionException) {
+      String msg = "Failed to register iceberg table";
+      log.error(msg, executionException);
+      throw new RuntimeException(msg, executionException.getCause());
+    } catch (RetryException retryException) {
+      String interruptedNote = Thread.currentThread().isInterrupted() ? "... 
then interrupted" : "";
+      String msg = String.format("Failed to register iceberg table (retried %d 
times %s)",

Review Comment:
   same here about table context



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
   private final TableMetadata readTimeSrcTableMetadata;
   private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
-  private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+  public static final String ICEBERG_REGISTER_STEP_PREFIX = 
"icebergRegisterStep";

Review Comment:
   "fully qualify" this by prefixing w/ 
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
   
   then name it more semantically, like:
   ```
   RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".catalog.registration.retries";



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
-          .retryIfException()
-          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+      Retryer<Void> registerRetryer = 
createRegisterRetryer(ConfigFactory.load());

Review Comment:
   rather than `.load()` turn `this.properties` into a `Config`.
   
   see the `IcebergDatasetFinder` that `CopySource` 
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
 those are the original properties of the `SourceState` that the enclosing 
gobblin job is launched with.
   
   they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset` 
that [instantiates this 
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test and perform w/ and w/o passing overriding retryer 
config.  e.g. specify that it should retry N times and then look for that 
number in the message text



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test and perform w/ and w/o passing overriding retryer 
config.  e.g. specify that it should retry N times and then look for that 
number in the exception message text



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to