This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bdd73c  [GOBBLIN-1603] Throws error if configured when encountering 
an IO exception while co… (#3460)
5bdd73c is described below

commit 5bdd73c5f61e7b2522d02261ca14c06e5d763611
Author: William Lo <[email protected]>
AuthorDate: Tue Feb 8 13:22:34 2022 -0800

    [GOBBLIN-1603] Throws error if configured when encountering an IO exception 
while co… (#3460)
    
    * Throws error if configured when encountering an IO exception while 
collecting copy entities
    
    * Fix checkstyle
---
 .../data/management/copy/hive/HiveDataset.java     |  20 +++-
 .../data/management/copy/hive/HiveDatasetTest.java | 115 ++++++++++++++++-----
 2 files changed, 103 insertions(+), 32 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 319b980..7032a88 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -148,13 +148,16 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   @Override
   public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration)
       throws IOException {
-    if (!canCopyTable()) {
+    if (!canCopyTable(configuration)) {
       return Iterators.emptyIterator();
     }
     try {
       return new HiveCopyEntityHelper(this, configuration, 
targetFs).getCopyEntities(configuration);
     } catch (IOException ioe) {
       log.error("Failed to copy table " + this.table, ioe);
+      if (configuration.isAbortOnSingleDatasetFailure()) {
+        throw new RuntimeException(ioe);
+      }
       return Iterators.emptyIterator();
     }
   }
@@ -167,7 +170,7 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
       Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor)
       throws IOException {
-    if (!canCopyTable()) {
+    if (!canCopyTable(configuration)) {
       return Iterators.emptyIterator();
     }
     try {
@@ -177,6 +180,9 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
       return fileSetList.iterator();
     } catch (IOException ioe) {
       log.error("Failed to copy table " + this.table, ioe);
+      if (configuration.isAbortOnSingleDatasetFailure()) {
+        throw new RuntimeException(ioe);
+      }
       return Iterators.emptyIterator();
     }
   }
@@ -329,10 +335,14 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
     }
   }
 
-  private boolean canCopyTable() {
+  private boolean canCopyTable(CopyConfiguration configuration) {
     if (!COPYABLE_TABLES.contains(this.table.getTableType())) {
-      log.warn(String.format("Not copying %s: tables of type %s are not 
copyable.", this.table.getCompleteName(),
-          this.table.getTableType()));
+      String message = String.format("Not copying %s: tables of type %s are 
not copyable.", this.table.getCompleteName(),
+          this.table.getTableType());
+      log.warn(message);
+      if (configuration.isAbortOnSingleDatasetFailure()) {
+        throw new RuntimeException(message);
+      }
       return false;
     }
     return true;
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
index d9f3733..66174bb 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
@@ -16,9 +16,19 @@
  */
 package org.apache.gobblin.data.management.copy.hive;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -34,12 +44,13 @@ public class HiveDatasetTest {
   private static String DUMMY_CONFIG_KEY_WITH_DB_TOKEN = "dummyConfig.withDB";
   private static String DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX = "dummyConfig" + 
ConfigUtils.STRIP_SUFFIX;
   private static String DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN = 
"dummyConfig.withTable";
-  private static Config config = 
ConfigFactory.parseMap(ImmutableMap.<String,String> builder()
-      .put(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX, "testRoot")
-      .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY, 
"resPrefix_$LOGICAL_DB_resPostfix")
-      .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY, 
"resPrefix_$LOGICAL_TABLE_resPostfix")
-      .put(DUMMY_CONFIG_KEY_WITH_DB_TOKEN, "resPrefix_$DB_resPostfix")
-      .put(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN, 
"resPrefix_$TABLE_resPostfix").build());
+  private static Config config = ConfigFactory.parseMap(
+      ImmutableMap.<String, 
String>builder().put(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX, "testRoot")
+          .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY, 
"resPrefix_$LOGICAL_DB_resPostfix")
+          .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY, 
"resPrefix_$LOGICAL_TABLE_resPostfix")
+          .put(DUMMY_CONFIG_KEY_WITH_DB_TOKEN, "resPrefix_$DB_resPostfix")
+          .put(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN, 
"resPrefix_$TABLE_resPostfix")
+          .build());
 
   @Test
   public void testParseLogicalDbAndTable() throws IOException {
@@ -58,24 +69,24 @@ public class HiveDatasetTest {
     // Happy Path - without prefix in DB and Table names
     datasetNamePattern = "$LOGICAL_DB_dbPostfix.$LOGICAL_TABLE_tablePostfix";
     logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
-        new HiveDatasetFinder.DbAndTable("myDB_dbPostfix", 
"myTable_tablePostfix"),
-        HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+        new HiveDatasetFinder.DbAndTable("myDB_dbPostfix", 
"myTable_tablePostfix"), HiveDataset.LOGICAL_DB_TOKEN,
+        HiveDataset.LOGICAL_TABLE_TOKEN);
     Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed 
correctly");
     Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name 
not parsed correctly");
 
     // Happy Path - without postfix in DB and Table names
     datasetNamePattern = "dbPrefix_$LOGICAL_DB.tablePrefix_$LOGICAL_TABLE";
     logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
-        new HiveDatasetFinder.DbAndTable("dbPrefix_myDB", 
"tablePrefix_myTable"),
-        HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+        new HiveDatasetFinder.DbAndTable("dbPrefix_myDB", 
"tablePrefix_myTable"), HiveDataset.LOGICAL_DB_TOKEN,
+        HiveDataset.LOGICAL_TABLE_TOKEN);
     Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed 
correctly");
     Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name 
not parsed correctly");
 
     // Happy Path - without any prefix and postfix in DB and Table names
     datasetNamePattern = "$LOGICAL_DB.$LOGICAL_TABLE";
-    logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
-        new HiveDatasetFinder.DbAndTable("myDB", "myTable"),
-        HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+    logicalDbAndTable =
+        HiveDataset.parseLogicalDbAndTable(datasetNamePattern, new 
HiveDatasetFinder.DbAndTable("myDB", "myTable"),
+            HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
     Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed 
correctly");
     Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name 
not parsed correctly");
 
@@ -118,14 +129,14 @@ public class HiveDatasetTest {
     Assert.assertEquals(tokenValue, "myDB", "DB name not extracted correctly");
 
     // Happy Path - without postfix
-    tokenValue = HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB", 
"dbPrefix_$LOGICAL_DB",
-        HiveDataset.LOGICAL_DB_TOKEN);
+    tokenValue = HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB", 
"dbPrefix_$LOGICAL_DB", HiveDataset.LOGICAL_DB_TOKEN);
     Assert.assertEquals(tokenValue, "myDB", "DB name not extracted correctly");
 
     // Missing token in template
     try {
-      tokenValue = 
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", 
"dbPrefix_$LOGICAL_TABLE_dbPostfix",
-          HiveDataset.LOGICAL_DB_TOKEN);
+      tokenValue =
+          HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", 
"dbPrefix_$LOGICAL_TABLE_dbPostfix",
+              HiveDataset.LOGICAL_DB_TOKEN);
       Assert.fail("Token is missing in template, code should have thrown 
exception");
     } catch (IllegalArgumentException e) {
       // Ignore exception, it was expected
@@ -133,8 +144,7 @@ public class HiveDatasetTest {
 
     // Missing source entity
     try {
-      tokenValue = HiveDataset.extractTokenValueFromEntity("", 
"dbPrefix_$LOGICAL_DB_dbPostfix",
-          HiveDataset.LOGICAL_DB_TOKEN);
+      tokenValue = HiveDataset.extractTokenValueFromEntity("", 
"dbPrefix_$LOGICAL_DB_dbPostfix", HiveDataset.LOGICAL_DB_TOKEN);
       Assert.fail("Source entity is missing, code should have thrown 
exception");
     } catch (IllegalArgumentException e) {
       // Ignore exception, it was expected
@@ -142,8 +152,7 @@ public class HiveDatasetTest {
 
     // Missing template
     try {
-      tokenValue = 
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", "",
-          HiveDataset.LOGICAL_DB_TOKEN);
+      tokenValue = 
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", "", 
HiveDataset.LOGICAL_DB_TOKEN);
       Assert.fail("Template is missing, code should have thrown exception");
     } catch (IllegalArgumentException e) {
       // Ignore exception, it was expected
@@ -156,15 +165,67 @@ public class HiveDatasetTest {
     HiveDatasetFinder.DbAndTable logicalDbAndTable = new 
HiveDatasetFinder.DbAndTable("logicalDb", "logicalTable");
     Config resolvedConfig = HiveDataset.resolveConfig(config, realDbAndTable, 
logicalDbAndTable);
 
-    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_DB_TOKEN),
-        "resPrefix_realDb_resPostfix", "Real DB not resolved correctly");
-    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN),
-        "resPrefix_realTable_resPostfix", "Real Table not resolved correctly");
+    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_DB_TOKEN), 
"resPrefix_realDb_resPostfix",
+        "Real DB not resolved correctly");
+    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN),
 "resPrefix_realTable_resPostfix",
+        "Real Table not resolved correctly");
 
     
Assert.assertEquals(resolvedConfig.getString(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY),
         "resPrefix_logicalDb_resPostfix", "Logical DB not resolved correctly");
     
Assert.assertEquals(resolvedConfig.getString(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY),
         "resPrefix_logicalTable_resPostfix", "Logical Table not resolved 
correctly");
-    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX),"testRoot");
+    
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX),
 "testRoot");
+  }
+
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testThrowsErrorIfCopyEntityHelperFails() throws Exception {
+    Properties copyProperties = new Properties();
+    Properties hiveProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    // Invoke an IOException by passing in a class that does not exist to 
HiveCopyEntityHelper constructor
+    hiveProperties.put(HiveCopyEntityHelper.COPY_PARTITION_FILTER_GENERATOR, 
"missingClass");
+    Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+    HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+
+    HiveDataset passingDataset = new HiveDataset(new LocalFileSystem(), pool, 
table, hiveProperties);
+    // Even though IOException is thrown HiveDataset should silence it due to 
not having the configuration flag
+    try {
+      CopyConfiguration copyConfigWithoutAbortKey = 
CopyConfiguration.builder(new LocalFileSystem(), copyProperties).build();
+      passingDataset.getFileSetIterator(FileSystem.getLocal(new 
Configuration()), copyConfigWithoutAbortKey);
+    } catch (Exception e) {
+      Assert.fail("IOException should log and fail silently since it is not 
configured");
+    }
+
+    // Exception should propagate to a RuntimeException since flag is enabled
+    copyProperties.put(CopyConfiguration.ABORT_ON_SINGLE_DATASET_FAILURE, 
"true");
+    HiveDataset failingDataset = new HiveDataset(new LocalFileSystem(), pool, 
table, hiveProperties);
+    CopyConfiguration copyConfiguration = CopyConfiguration.builder(new 
LocalFileSystem(), copyProperties).build();
+    failingDataset.getFileSetIterator(FileSystem.getLocal(new 
Configuration()), copyConfiguration);
+  }
+
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testThrowsErrorIfTableNotCopyable() throws Exception {
+    Properties copyProperties = new Properties();
+    Properties hiveProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+    // Virtual view tables are not copyable
+    table.setTableType(TableType.VIRTUAL_VIEW);
+    HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+
+    HiveDataset passingDataset = new HiveDataset(new LocalFileSystem(), pool, 
table, hiveProperties);
+    // Since flag is not enabled the dataset should log an error and continue
+    try {
+      CopyConfiguration copyConfigWithoutAbortKey = 
CopyConfiguration.builder(new LocalFileSystem(), copyProperties).build();
+      passingDataset.getFileSetIterator(FileSystem.getLocal(new 
Configuration()), copyConfigWithoutAbortKey);
+    } catch (Exception e) {
+      Assert.fail("IOException should log and fail silently since it is not 
configured");
+    }
+
+    // Exception should propagate to a RuntimeException since flag is enabled
+    copyProperties.put(CopyConfiguration.ABORT_ON_SINGLE_DATASET_FAILURE, 
"true");
+    HiveDataset failingDataset = new HiveDataset(new LocalFileSystem(), pool, 
table, hiveProperties);
+    CopyConfiguration copyConfiguration = CopyConfiguration.builder(new 
LocalFileSystem(), copyProperties).build();
+    failingDataset.getFileSetIterator(FileSystem.getLocal(new 
Configuration()), copyConfiguration);
   }
-}
+}
\ No newline at end of file

Reply via email to