This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5bdd73c [GOBBLIN-1603] Throws error if configured when encountering
an IO exception while co… (#3460)
5bdd73c is described below
commit 5bdd73c5f61e7b2522d02261ca14c06e5d763611
Author: William Lo <[email protected]>
AuthorDate: Tue Feb 8 13:22:34 2022 -0800
[GOBBLIN-1603] Throws error if configured when encountering an IO exception
while co… (#3460)
* Throws error if configured when encountering an IO exception while
collecting copy entities
* Fix checkstyle
---
.../data/management/copy/hive/HiveDataset.java | 20 +++-
.../data/management/copy/hive/HiveDatasetTest.java | 115 ++++++++++++++++-----
2 files changed, 103 insertions(+), 32 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 319b980..7032a88 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -148,13 +148,16 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration)
throws IOException {
- if (!canCopyTable()) {
+ if (!canCopyTable(configuration)) {
return Iterators.emptyIterator();
}
try {
return new HiveCopyEntityHelper(this, configuration,
targetFs).getCopyEntities(configuration);
} catch (IOException ioe) {
log.error("Failed to copy table " + this.table, ioe);
+ if (configuration.isAbortOnSingleDatasetFailure()) {
+ throw new RuntimeException(ioe);
+ }
return Iterators.emptyIterator();
}
}
@@ -167,7 +170,7 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor)
throws IOException {
- if (!canCopyTable()) {
+ if (!canCopyTable(configuration)) {
return Iterators.emptyIterator();
}
try {
@@ -177,6 +180,9 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
return fileSetList.iterator();
} catch (IOException ioe) {
log.error("Failed to copy table " + this.table, ioe);
+ if (configuration.isAbortOnSingleDatasetFailure()) {
+ throw new RuntimeException(ioe);
+ }
return Iterators.emptyIterator();
}
}
@@ -329,10 +335,14 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
}
}
- private boolean canCopyTable() {
+ private boolean canCopyTable(CopyConfiguration configuration) {
if (!COPYABLE_TABLES.contains(this.table.getTableType())) {
- log.warn(String.format("Not copying %s: tables of type %s are not
copyable.", this.table.getCompleteName(),
- this.table.getTableType()));
+ String message = String.format("Not copying %s: tables of type %s are
not copyable.", this.table.getCompleteName(),
+ this.table.getTableType());
+ log.warn(message);
+ if (configuration.isAbortOnSingleDatasetFailure()) {
+ throw new RuntimeException(message);
+ }
return false;
}
return true;
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
index d9f3733..66174bb 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetTest.java
@@ -16,9 +16,19 @@
*/
package org.apache.gobblin.data.management.copy.hive;
+import com.google.common.base.Optional;
import java.io.IOException;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -34,12 +44,13 @@ public class HiveDatasetTest {
private static String DUMMY_CONFIG_KEY_WITH_DB_TOKEN = "dummyConfig.withDB";
private static String DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX = "dummyConfig" +
ConfigUtils.STRIP_SUFFIX;
private static String DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN =
"dummyConfig.withTable";
- private static Config config =
ConfigFactory.parseMap(ImmutableMap.<String,String> builder()
- .put(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX, "testRoot")
- .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY,
"resPrefix_$LOGICAL_DB_resPostfix")
- .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY,
"resPrefix_$LOGICAL_TABLE_resPostfix")
- .put(DUMMY_CONFIG_KEY_WITH_DB_TOKEN, "resPrefix_$DB_resPostfix")
- .put(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN,
"resPrefix_$TABLE_resPostfix").build());
+ private static Config config = ConfigFactory.parseMap(
+ ImmutableMap.<String,
String>builder().put(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX, "testRoot")
+ .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY,
"resPrefix_$LOGICAL_DB_resPostfix")
+ .put(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY,
"resPrefix_$LOGICAL_TABLE_resPostfix")
+ .put(DUMMY_CONFIG_KEY_WITH_DB_TOKEN, "resPrefix_$DB_resPostfix")
+ .put(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN,
"resPrefix_$TABLE_resPostfix")
+ .build());
@Test
public void testParseLogicalDbAndTable() throws IOException {
@@ -58,24 +69,24 @@ public class HiveDatasetTest {
// Happy Path - without prefix in DB and Table names
datasetNamePattern = "$LOGICAL_DB_dbPostfix.$LOGICAL_TABLE_tablePostfix";
logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
- new HiveDatasetFinder.DbAndTable("myDB_dbPostfix",
"myTable_tablePostfix"),
- HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+ new HiveDatasetFinder.DbAndTable("myDB_dbPostfix",
"myTable_tablePostfix"), HiveDataset.LOGICAL_DB_TOKEN,
+ HiveDataset.LOGICAL_TABLE_TOKEN);
Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed
correctly");
Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name
not parsed correctly");
// Happy Path - without postfix in DB and Table names
datasetNamePattern = "dbPrefix_$LOGICAL_DB.tablePrefix_$LOGICAL_TABLE";
logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
- new HiveDatasetFinder.DbAndTable("dbPrefix_myDB",
"tablePrefix_myTable"),
- HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+ new HiveDatasetFinder.DbAndTable("dbPrefix_myDB",
"tablePrefix_myTable"), HiveDataset.LOGICAL_DB_TOKEN,
+ HiveDataset.LOGICAL_TABLE_TOKEN);
Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed
correctly");
Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name
not parsed correctly");
// Happy Path - without any prefix and postfix in DB and Table names
datasetNamePattern = "$LOGICAL_DB.$LOGICAL_TABLE";
- logicalDbAndTable = HiveDataset.parseLogicalDbAndTable(datasetNamePattern,
- new HiveDatasetFinder.DbAndTable("myDB", "myTable"),
- HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
+ logicalDbAndTable =
+ HiveDataset.parseLogicalDbAndTable(datasetNamePattern, new
HiveDatasetFinder.DbAndTable("myDB", "myTable"),
+ HiveDataset.LOGICAL_DB_TOKEN, HiveDataset.LOGICAL_TABLE_TOKEN);
Assert.assertEquals(logicalDbAndTable.getDb(), "myDB", "DB name not parsed
correctly");
Assert.assertEquals(logicalDbAndTable.getTable(), "myTable", "Table name
not parsed correctly");
@@ -118,14 +129,14 @@ public class HiveDatasetTest {
Assert.assertEquals(tokenValue, "myDB", "DB name not extracted correctly");
// Happy Path - without postfix
- tokenValue = HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB",
"dbPrefix_$LOGICAL_DB",
- HiveDataset.LOGICAL_DB_TOKEN);
+ tokenValue = HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB",
"dbPrefix_$LOGICAL_DB", HiveDataset.LOGICAL_DB_TOKEN);
Assert.assertEquals(tokenValue, "myDB", "DB name not extracted correctly");
// Missing token in template
try {
- tokenValue =
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix",
"dbPrefix_$LOGICAL_TABLE_dbPostfix",
- HiveDataset.LOGICAL_DB_TOKEN);
+ tokenValue =
+ HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix",
"dbPrefix_$LOGICAL_TABLE_dbPostfix",
+ HiveDataset.LOGICAL_DB_TOKEN);
Assert.fail("Token is missing in template, code should have thrown
exception");
} catch (IllegalArgumentException e) {
// Ignore exception, it was expected
@@ -133,8 +144,7 @@ public class HiveDatasetTest {
// Missing source entity
try {
- tokenValue = HiveDataset.extractTokenValueFromEntity("",
"dbPrefix_$LOGICAL_DB_dbPostfix",
- HiveDataset.LOGICAL_DB_TOKEN);
+ tokenValue = HiveDataset.extractTokenValueFromEntity("",
"dbPrefix_$LOGICAL_DB_dbPostfix", HiveDataset.LOGICAL_DB_TOKEN);
Assert.fail("Source entity is missing, code should have thrown
exception");
} catch (IllegalArgumentException e) {
// Ignore exception, it was expected
@@ -142,8 +152,7 @@ public class HiveDatasetTest {
// Missing template
try {
- tokenValue =
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", "",
- HiveDataset.LOGICAL_DB_TOKEN);
+ tokenValue =
HiveDataset.extractTokenValueFromEntity("dbPrefix_myDB_dbPostfix", "",
HiveDataset.LOGICAL_DB_TOKEN);
Assert.fail("Template is missing, code should have thrown exception");
} catch (IllegalArgumentException e) {
// Ignore exception, it was expected
@@ -156,15 +165,67 @@ public class HiveDatasetTest {
HiveDatasetFinder.DbAndTable logicalDbAndTable = new
HiveDatasetFinder.DbAndTable("logicalDb", "logicalTable");
Config resolvedConfig = HiveDataset.resolveConfig(config, realDbAndTable,
logicalDbAndTable);
-
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_DB_TOKEN),
- "resPrefix_realDb_resPostfix", "Real DB not resolved correctly");
-
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN),
- "resPrefix_realTable_resPostfix", "Real Table not resolved correctly");
+
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_DB_TOKEN),
"resPrefix_realDb_resPostfix",
+ "Real DB not resolved correctly");
+
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_TABLE_TOKEN),
"resPrefix_realTable_resPostfix",
+ "Real Table not resolved correctly");
Assert.assertEquals(resolvedConfig.getString(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_DB_NAME_KEY),
"resPrefix_logicalDb_resPostfix", "Logical DB not resolved correctly");
Assert.assertEquals(resolvedConfig.getString(HiveDatasetVersionCleaner.REPLACEMENT_HIVE_TABLE_NAME_KEY),
"resPrefix_logicalTable_resPostfix", "Logical Table not resolved
correctly");
-
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX),"testRoot");
+
Assert.assertEquals(resolvedConfig.getString(DUMMY_CONFIG_KEY_WITH_STRIP_SUFFIX),
"testRoot");
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testThrowsErrorIfCopyEntityHelperFails() throws Exception {
+ Properties copyProperties = new Properties();
+ Properties hiveProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ // Invoke an IOException by passing in a class that does not exist to
HiveCopyEntityHelper constructor
+ hiveProperties.put(HiveCopyEntityHelper.COPY_PARTITION_FILTER_GENERATOR,
"missingClass");
+ Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+
+ HiveDataset passingDataset = new HiveDataset(new LocalFileSystem(), pool,
table, hiveProperties);
+ // Even though IOException is thrown HiveDataset should silence it due to
not having the configuration flag
+ try {
+ CopyConfiguration copyConfigWithoutAbortKey =
CopyConfiguration.builder(new LocalFileSystem(), copyProperties).build();
+ passingDataset.getFileSetIterator(FileSystem.getLocal(new
Configuration()), copyConfigWithoutAbortKey);
+ } catch (Exception e) {
+ Assert.fail("IOException should log and fail silently since it is not
configured");
+ }
+
+ // Exception should propagate to a RuntimeException since flag is enabled
+ copyProperties.put(CopyConfiguration.ABORT_ON_SINGLE_DATASET_FAILURE,
"true");
+ HiveDataset failingDataset = new HiveDataset(new LocalFileSystem(), pool,
table, hiveProperties);
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(new
LocalFileSystem(), copyProperties).build();
+ failingDataset.getFileSetIterator(FileSystem.getLocal(new
Configuration()), copyConfiguration);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testThrowsErrorIfTableNotCopyable() throws Exception {
+ Properties copyProperties = new Properties();
+ Properties hiveProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+ // Virtual view tables are not copyable
+ table.setTableType(TableType.VIRTUAL_VIEW);
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+
+ HiveDataset passingDataset = new HiveDataset(new LocalFileSystem(), pool,
table, hiveProperties);
+ // Since flag is not enabled the dataset should log an error and continue
+ try {
+ CopyConfiguration copyConfigWithoutAbortKey =
CopyConfiguration.builder(new LocalFileSystem(), copyProperties).build();
+ passingDataset.getFileSetIterator(FileSystem.getLocal(new
Configuration()), copyConfigWithoutAbortKey);
+ } catch (Exception e) {
+ Assert.fail("IOException should log and fail silently since it is not
configured");
+ }
+
+ // Exception should propagate to a RuntimeException since flag is enabled
+ copyProperties.put(CopyConfiguration.ABORT_ON_SINGLE_DATASET_FAILURE,
"true");
+ HiveDataset failingDataset = new HiveDataset(new LocalFileSystem(), pool,
table, hiveProperties);
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(new
LocalFileSystem(), copyProperties).build();
+ failingDataset.getFileSetIterator(FileSystem.getLocal(new
Configuration()), copyConfiguration);
}
-}
+}
\ No newline at end of file