This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c2197a [HUDI-1269] Make whether the failure of connect hive affects
hudi ingest process configurable (#2443)
8c2197a is described below
commit 8c2197ae5e9c139e488a33f5a507b79bfa2f6f27
Author: liujinhui <[email protected]>
AuthorDate: Thu Feb 25 23:09:32 2021 +0800
[HUDI-1269] Make whether the failure of connect hive affects hudi ingest
process configurable (#2443)
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../main/java/org/apache/hudi/DataSourceUtils.java | 2 +
.../scala/org/apache/hudi/DataSourceOptions.scala | 2 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 1 +
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 4 ++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 74 +++++++++++++---------
.../org/apache/hudi/hive/TestHiveSyncTool.java | 22 +++++++
6 files changed, 76 insertions(+), 29 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 18c51e3..632a155 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -293,6 +293,8 @@ public class DataSourceUtils {
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
hiveSyncConfig.autoCreateDatabase =
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
+ hiveSyncConfig.ignoreExceptions =
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS_OPT_KEY(),
+ DataSourceWriteOptions.DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY()));
hiveSyncConfig.skipROSuffix =
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
hiveSyncConfig.supportTimestamp =
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 965b35c..4b8e97c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -347,6 +347,7 @@ object DataSourceWriteOptions {
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY =
"hoodie.datasource.hive_sync.use_pre_apache_input_format"
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY =
"hoodie.datasource.hive_sync.auto_create_database"
+ val HIVE_IGNORE_EXCEPTIONS_OPT_KEY =
"hoodie.datasource.hive_sync.ignore_exceptions"
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
@@ -365,6 +366,7 @@ object DataSourceWriteOptions {
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
+ val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false"
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f5ba6c8..ef28191 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -374,6 +374,7 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.useFileListingFromMetadata =
parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
hiveSyncConfig.verifyMetadataFileListing =
parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
+ hiveSyncConfig.ignoreExceptions =
parameters.get(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.supportTimestamp =
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig.autoCreateDatabase =
parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.decodePartition =
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index dd9d483..0063d15 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -76,6 +76,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--auto-create-database"}, description = "Auto create
hive database")
public Boolean autoCreateDatabase = true;
+ @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive
exceptions")
+ public Boolean ignoreExceptions = false;
+
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro`
suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@@ -130,6 +133,7 @@ public class HiveSyncConfig implements Serializable {
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", autoCreateDatabase=" + autoCreateDatabase
+ + ", ignoreExceptions=" + ignoreExceptions
+ ", skipROSuffix=" + skipROSuffix
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 47d4500..bbda97e 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -58,56 +58,72 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final HiveSyncConfig cfg;
- private final HoodieHiveClient hoodieHiveClient;
- private final String snapshotTableName;
- private final Option<String> roTableTableName;
+ private HoodieHiveClient hoodieHiveClient = null;
+ private String snapshotTableName = null;
+ private Option<String> roTableName = null;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem
fs) {
super(configuration.getAllProperties(), fs);
- this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
+
+ try {
+ this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
+ } catch (RuntimeException e) {
+ if (cfg.ignoreExceptions) {
+ LOG.error("Got runtime exception when hive syncing, but continuing as
ignoreExceptions config is set ", e);
+ } else {
+ throw new HoodieHiveSyncException("Got runtime exception when hive
syncing", e);
+ }
+ }
+
this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
if
(NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass))
{
LOG.warn("Set partitionFields to empty, since the
NonPartitionedExtractor is used");
cfg.partitionFields = new ArrayList<>();
}
- switch (hoodieHiveClient.getTableType()) {
- case COPY_ON_WRITE:
- this.snapshotTableName = cfg.tableName;
- this.roTableTableName = Option.empty();
- break;
- case MERGE_ON_READ:
- this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
- this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
- Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
- break;
- default:
- LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
- throw new InvalidTableException(hoodieHiveClient.getBasePath());
- }
- }
-
- @Override
- public void syncHoodieTable() {
- try {
+ if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
- syncHoodieTable(snapshotTableName, false);
+ this.snapshotTableName = cfg.tableName;
+ this.roTableName = Option.empty();
break;
case MERGE_ON_READ:
- // sync a RO table for MOR
- syncHoodieTable(roTableTableName.get(), false);
- // sync a RT table for MOR
- syncHoodieTable(snapshotTableName, true);
+ this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+ this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+ Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
+ }
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ try {
+ if (hoodieHiveClient != null) {
+ switch (hoodieHiveClient.getTableType()) {
+ case COPY_ON_WRITE:
+ syncHoodieTable(snapshotTableName, false);
+ break;
+ case MERGE_ON_READ:
+ // sync a RO table for MOR
+ syncHoodieTable(roTableName.get(), false);
+ // sync a RT table for MOR
+ syncHoodieTable(snapshotTableName, true);
+ break;
+ default:
+ LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+ throw new InvalidTableException(hoodieHiveClient.getBasePath());
+ }
+ }
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when hive syncing " +
cfg.tableName, re);
} finally {
- hoodieHiveClient.close();
+ if (hoodieHiveClient != null) {
+ hoodieHiveClient.close();
+ }
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 8a1ea4f..c38a6ed 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -613,4 +614,25 @@ public class TestHiveSyncTool {
"The last commit that was sycned should be 103");
}
+ @Test
+ public void testConnectExceptionIgnoreConfigSet() throws IOException,
URISyntaxException {
+ HiveTestUtil.hiveSyncConfig.useJdbc = true;
+ String instantTime = "100";
+ HiveTestUtil.createCOWTable(instantTime, 5, false);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+ // Lets do the sync
+
+ HiveSyncConfig syncToolConfig =
HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
+ syncToolConfig.ignoreExceptions = true;
+ syncToolConfig.jdbcUrl =
HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031");
+ HiveSyncTool tool = new HiveSyncTool(syncToolConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+ }
+
}