This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 31192242cea [HUDI-6261] Make deltastreamer exceptions more descriptive
(#8638)
31192242cea is described below
commit 31192242cea06c08ac00f537cb87fe547fe18ec9
Author: Jon Vexler <[email protected]>
AuthorDate: Sat May 27 14:25:04 2023 -0400
[HUDI-6261] Make deltastreamer exceptions more descriptive (#8638)
---
.../org/apache/hudi/AvroConversionUtils.scala | 21 +++++---
.../HoodieIncrementalPathNotFoundException.java | 33 +++++++++++++
.../hudi/exception/HoodieMetaSyncException.java | 39 +++++++++++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 9 ++--
.../org/apache/hudi/IncrementalRelation.scala | 32 ++++++++-----
.../TestIncrementalReadWithFullTableScan.scala | 3 +-
.../hudi/sync/common/util/SyncUtilHelpers.java | 23 +++++----
.../org/apache/hudi/utilities/UtilHelpers.java | 35 +++++++++-----
.../hudi/utilities/deltastreamer/DeltaSync.java | 33 +++++++------
.../HoodieDeltaStreamerWriteException.java | 30 ++++++++++++
.../exception/HoodieReadFromSourceException.java | 32 +++++++++++++
.../exception/HoodieSchemaFetchException.java | 33 +++++++++++++
.../exception/HoodieTransformException.java | 34 +++++++++++++
.../HoodieTransformExecutionException.java | 28 +++++++++++
.../exception/HoodieTransformPlanException.java | 33 +++++++++++++
.../utilities/schema/FilebasedSchemaProvider.java | 4 +-
.../hudi/utilities/schema/HiveSchemaProvider.java | 6 +--
.../utilities/schema/JdbcbasedSchemaProvider.java | 8 +---
.../utilities/schema/KafkaOffsetPostProcessor.java | 22 +++++----
.../schema/ProtoClassBasedSchemaProvider.java | 9 +++-
.../utilities/schema/RowBasedSchemaProvider.java | 3 +-
.../utilities/schema/SchemaRegistryProvider.java | 56 ++++++++++++----------
.../hudi/utilities/sources/AvroKafkaSource.java | 9 ++--
.../hudi/utilities/sources/GcsEventsSource.java | 13 +++--
.../hudi/utilities/sources/HiveIncrPullSource.java | 6 +--
.../apache/hudi/utilities/sources/JdbcSource.java | 3 +-
.../hudi/utilities/sources/ProtoKafkaSource.java | 8 ++--
.../hudi/utilities/sources/PulsarSource.java | 5 +-
.../utilities/sources/debezium/DebeziumSource.java | 11 ++---
.../utilities/sources/helpers/AvroConvertor.java | 45 +++++++++++++----
.../utilities/transform/ChainedTransformer.java | 18 +++----
.../utilities/transform/FlatteningTransformer.java | 20 ++++----
.../transform/SqlFileBasedTransformer.java | 7 +--
.../transform/SqlQueryBasedTransformer.java | 26 ++++++----
.../deltastreamer/TestHoodieDeltaStreamer.java | 3 +-
.../multisync/TestMultipleMetaSync.java | 6 +--
.../functional/TestChainedTransformer.java | 3 +-
.../TestErrorTableAwareChainedTransformer.java | 3 +-
.../debezium/TestAbstractDebeziumSource.java | 3 +-
.../transform/TestSqlFileBasedTransformer.java | 4 +-
40 files changed, 541 insertions(+), 178 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 8e44dc4b1fe..be86cd37df9 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -138,18 +139,26 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: DataType,
structName: String,
recordNamespace: String): Schema = {
- val schemaConverters = sparkAdapter.getAvroSchemaConverters
- val avroSchema = schemaConverters.toAvroType(structType, nullable = false,
structName, recordNamespace)
- getAvroSchemaWithDefaults(avroSchema, structType)
+ try {
+ val schemaConverters = sparkAdapter.getAvroSchemaConverters
+ val avroSchema = schemaConverters.toAvroType(structType, nullable =
false, structName, recordNamespace)
+ getAvroSchemaWithDefaults(avroSchema, structType)
+ } catch {
+ case e: Exception => throw new HoodieSchemaException("Failed to convert
struct type to avro schema: " + structType, e)
+ }
}
/**
* Converts Avro's [[Schema]] to Catalyst's [[StructType]]
*/
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
- val schemaConverters = sparkAdapter.getAvroSchemaConverters
- schemaConverters.toSqlType(avroSchema) match {
- case (dataType, _) => dataType.asInstanceOf[StructType]
+ try {
+ val schemaConverters = sparkAdapter.getAvroSchemaConverters
+ schemaConverters.toSqlType(avroSchema) match {
+ case (dataType, _) => dataType.asInstanceOf[StructType]
+ }
+ } catch {
+ case e: Exception => throw new HoodieSchemaException("Failed to convert
avro schema to struct type: " + avroSchema, e)
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
new file mode 100644
index 00000000000..cce29f96050
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Thrown when path is not found in incremental query.
+ * see [HUDI-2711] for more details on the issue
+ */
+public class HoodieIncrementalPathNotFoundException extends HoodieException {
+
+ public HoodieIncrementalPathNotFoundException(Throwable e) {
+ super("Path not found during incremental query. It is likely that the
underlying file has been "
+ + "moved or deleted by the cleaner. Consider setting "
+ + "hoodie.datasource.read.incr.fallback.fulltablescan.enable to
true.", e);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
new file mode 100644
index 00000000000..ba9e51c2ee1
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import java.util.Map;
+
+public class HoodieMetaSyncException extends HoodieException {
+
+ private Map<String,HoodieException> failedMetaSyncs;
+
+ public Map<String,HoodieException> getFailedMetaSyncs() {
+ return failedMetaSyncs;
+ }
+
+ public HoodieMetaSyncException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public HoodieMetaSyncException(String msg, Map<String,HoodieException>
failedMetaSyncs) {
+ super(msg);
+ this.failedMetaSyncs = failedMetaSyncs;
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2a2e0d59e7d..f353f78b063 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -59,6 +59,7 @@ import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils,
SparkKeyGeneratorI
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
+import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@@ -907,18 +908,18 @@ object HoodieSparkSqlWriter {
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key,
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
// Collect exceptions in list because we want all sync to run. Then we
can throw
- val metaSyncExceptions = new ListBuffer[HoodieException]()
+ val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
syncClientToolClassSet.foreach(impl => {
try {
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf,
fs, basePath.toString, baseFileFormat)
} catch {
case e: HoodieException =>
log.info("SyncTool class " + impl.trim + " failed with exception",
e)
- metaSyncExceptions.add(e)
+ failedMetaSyncs.put(impl, e)
}
})
- if (metaSyncExceptions.nonEmpty) {
- throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions)
+ if (failedMetaSyncs.nonEmpty) {
+ throw getHoodieMetaSyncException(failedMetaSyncs)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 5624120ae0d..b81a749f613 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant,
HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.{HoodieTimer, InternalSchemaCache}
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.exception.{HoodieException,
HoodieIncrementalPathNotFoundException}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.table.HoodieSparkTable
@@ -37,7 +37,7 @@ import org.apache.spark.rdd.RDD
import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
@@ -261,15 +261,25 @@ class IncrementalRelation(val sqlContext: SQLContext,
}
if (regularFileIdToFullPath.nonEmpty) {
- df = df.union(sqlContext.read.options(sOpts)
- .schema(usedSchema).format(formatClassName)
- // Setting time to the END_INSTANT_TIME, to avoid pathFilter
filter out files incorrectly.
- .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(),
endInstantTime)
- .load(filteredRegularFullPaths.toList: _*)
- .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.last.getTimestamp)))
+ try {
+ df = df.union(sqlContext.read.options(sOpts)
+ .schema(usedSchema).format(formatClassName)
+ // Setting time to the END_INSTANT_TIME, to avoid pathFilter
filter out files incorrectly.
+
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
+ .load(filteredRegularFullPaths.toList: _*)
+ .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.head.getTimestamp))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.last.getTimestamp)))
+ } catch {
+ case e : AnalysisException =>
+ if (e.getMessage.contains("Path does not exist")) {
+ throw new HoodieIncrementalPathNotFoundException(e)
+ } else {
+ throw e
+ }
+ }
+
}
df
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 45b9cb4c902..1d89f105331 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN
import org.apache.hudi.common.table.timeline.{HoodieInstant,
HoodieInstantTimeGenerator, HoodieTimeline}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.SparkException
@@ -174,7 +175,7 @@ class TestIncrementalReadWithFullTableScan extends
HoodieSparkClientTestBase {
val msg = "Should fail with Path does not exist"
tableType match {
case HoodieTableType.COPY_ON_WRITE =>
- assertThrows(classOf[AnalysisException], new Executable {
+ assertThrows(classOf[HoodieIncrementalPathNotFoundException], new
Executable {
override def execute(): Unit = {
fn()
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index a1c63da989b..036ca3b47a9 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -22,6 +22,7 @@ package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncTool;
@@ -30,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
/**
@@ -58,7 +59,7 @@ public class SyncUtilHelpers {
try (HoodieSyncTool syncTool = instantiateMetaSyncTool(syncToolClassName,
props, hadoopConfig, fs, targetBasePath, baseFileFormat)) {
syncTool.syncHoodieTable();
} catch (Throwable e) {
- throw new HoodieException("Could not sync using the meta sync class " +
syncToolClassName, e);
+ throw new HoodieMetaSyncException("Could not sync using the meta sync
class " + syncToolClassName, e);
}
}
@@ -106,16 +107,18 @@ public class SyncUtilHelpers {
}
}
- public static HoodieException
getExceptionFromList(Collection<HoodieException> exceptions) {
- if (exceptions.size() == 1) {
- return exceptions.stream().findFirst().get();
+ public static HoodieException
getHoodieMetaSyncException(Map<String,HoodieException> failedMetaSyncs) {
+ if (failedMetaSyncs.size() == 1) {
+ return failedMetaSyncs.values().stream().findFirst().get();
}
StringBuilder sb = new StringBuilder();
- sb.append("Multiple exceptions during meta sync:\n");
- exceptions.forEach(e -> {
- sb.append(e.getMessage());
+ sb.append("MetaSyncs failed: {");
+ sb.append(String.join(",", failedMetaSyncs.keySet()));
+ sb.append("}\n");
+ for (String impl : failedMetaSyncs.keySet()) {
+ sb.append(failedMetaSyncs.get(impl).getMessage());
sb.append("\n");
- });
- return new HoodieException(sb.toString());
+ }
+ return new HoodieMetaSyncException(sb.toString(),failedMetaSyncs);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 16ed7eadc1f..0805c42698f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -47,6 +47,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -432,13 +433,11 @@ public class UtilHelpers {
/**
* Returns true if the table already exists in the JDBC database.
*/
- private static Boolean tableExists(Connection conn, Map<String, String>
options) {
+ private static Boolean tableExists(Connection conn, Map<String, String>
options) throws SQLException {
JdbcDialect dialect =
JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
try (PreparedStatement statement =
conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME()))))
{
statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
statement.executeQuery();
- } catch (SQLException e) {
- throw new HoodieException(e);
}
return true;
}
@@ -450,13 +449,25 @@ public class UtilHelpers {
* @return
* @throws Exception
*/
- public static Schema getJDBCSchema(Map<String, String> options) throws
Exception {
- Connection conn = createConnectionFactory(options);
- String url = options.get(JDBCOptions.JDBC_URL());
- String table = options.get(JDBCOptions.JDBC_TABLE_NAME());
- boolean tableExists = tableExists(conn, options);
+ public static Schema getJDBCSchema(Map<String, String> options) {
+ Connection conn;
+ String url;
+ String table;
+ boolean tableExists;
+ try {
+ conn = createConnectionFactory(options);
+ url = options.get(JDBCOptions.JDBC_URL());
+ table = options.get(JDBCOptions.JDBC_TABLE_NAME());
+ tableExists = tableExists(conn, options);
+ } catch (Exception e) {
+ throw new HoodieSchemaFetchException("Failed to connect to jdbc", e);
+ }
- if (tableExists) {
+ if (!tableExists) {
+ throw new HoodieSchemaFetchException(String.format("%s table does not
exists!", table));
+ }
+
+ try {
JdbcDialect dialect = JdbcDialects.get(url);
try (PreparedStatement statement =
conn.prepareStatement(dialect.getSchemaQuery(table))) {
statement.setQueryTimeout(Integer.parseInt(options.get("queryTimeout")));
@@ -470,8 +481,10 @@ public class UtilHelpers {
return AvroConversionUtils.convertStructTypeToAvroSchema(structType,
table, "hoodie." + table);
}
}
- } else {
- throw new HoodieException(String.format("%s table does not exists!",
table));
+ } catch (HoodieException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HoodieSchemaFetchException(String.format("Unable to fetch
schema from %s table", table), e);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 936320a648e..8a0b4050243 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -67,6 +67,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -86,6 +87,8 @@ import
org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.utilities.exception.HoodieDeltaStreamerWriteException;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
@@ -153,6 +156,7 @@ import static
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
+import static
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
@@ -868,12 +872,12 @@ public class DeltaSync implements Serializable, Closeable
{
case ROLLBACK_COMMIT:
LOG.info("Commit " + instantTime + " failed!");
writeClient.rollback(instantTime);
- throw new HoodieException("Error Table Commit failed!");
+ throw new HoodieDeltaStreamerWriteException("Error table commit
failed");
case LOG_ERROR:
LOG.error("Error Table write failed for instant " + instantTime);
break;
default:
- throw new HoodieException("Write failure strategy not
implemented for " + errorWriteFailureStrategy);
+ throw new HoodieDeltaStreamerWriteException("Write failure
strategy not implemented for " + errorWriteFailureStrategy);
}
}
}
@@ -892,7 +896,7 @@ public class DeltaSync implements Serializable, Closeable {
}
} else {
LOG.info("Commit " + instantTime + " failed!");
- throw new HoodieException("Commit " + instantTime + " failed!");
+ throw new HoodieDeltaStreamerWriteException("Commit " + instantTime +
" failed!");
}
} else {
LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErrorRecords + "/" + totalRecords);
@@ -905,7 +909,7 @@ public class DeltaSync implements Serializable, Closeable {
});
// Rolling back instant
writeClient.rollback(instantTime);
- throw new HoodieException("Commit " + instantTime + " failed and
rolled-back !");
+ throw new HoodieDeltaStreamerWriteException("Commit " + instantTime + "
failed and rolled-back !");
}
long overallTimeMs = overallTimerContext != null ?
overallTimerContext.stop() : 0;
@@ -971,21 +975,20 @@ public class DeltaSync implements Serializable, Closeable
{
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
}
- //Collect exceptions in list because we want all sync to run. Then we
can throw
- List<HoodieException> metaSyncExceptions = new ArrayList<>();
+ Map<String,HoodieException> failedMetaSyncs = new HashMap<>();
for (String impl : syncClientToolClasses) {
+ Timer.Context syncContext = metrics.getMetaSyncTimerContext();
try {
- Timer.Context syncContext = metrics.getMetaSyncTimerContext();
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs,
cfg.targetBasePath, cfg.baseFileFormat);
- long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
-
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl),
metaSyncTimeMs);
- } catch (HoodieException e) {
- LOG.info("SyncTool class " + impl.trim() + " failed with exception",
e);
- metaSyncExceptions.add(e);
+ } catch (HoodieMetaSyncException e) {
+ LOG.warn("SyncTool class " + impl.trim() + " failed with exception",
e);
+ failedMetaSyncs.put(impl, e);
}
+ long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
+
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl),
metaSyncTimeMs);
}
- if (!metaSyncExceptions.isEmpty()) {
- throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions);
+ if (!failedMetaSyncs.isEmpty()) {
+ throw getHoodieMetaSyncException(failedMetaSyncs);
}
}
}
@@ -1131,7 +1134,7 @@ public class DeltaSync implements Serializable, Closeable
{
}
return newWriteSchema;
} catch (Exception e) {
- throw new HoodieException("Failed to fetch schema from table ", e);
+ throw new HoodieSchemaFetchException("Failed to fetch schema from
table", e);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
new file mode 100644
index 00000000000..7d57b1a2124
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+public class HoodieDeltaStreamerWriteException extends
HoodieDeltaStreamerException {
+
+ public HoodieDeltaStreamerWriteException(String msg) {
+ super(msg);
+ }
+
+ public HoodieDeltaStreamerWriteException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
new file mode 100644
index 00000000000..33c0d0d2dd2
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieReadFromSourceException extends HoodieException {
+
+ public HoodieReadFromSourceException(String msg) {
+ super(msg);
+ }
+
+ public HoodieReadFromSourceException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
new file mode 100644
index 00000000000..d2f80e068d5
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/*
+ * Thrown when a schema provider is unable to fetch the schema
+ */
+public class HoodieSchemaFetchException extends HoodieSchemaProviderException {
+
+ public HoodieSchemaFetchException(String msg) {
+ super(msg);
+ }
+
+ public HoodieSchemaFetchException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
new file mode 100644
index 00000000000..ecf2c9e770c
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * general exception in transformer
+ */
+public class HoodieTransformException extends HoodieException {
+ public HoodieTransformException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public HoodieTransformException(String msg) {
+ super(msg);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
new file mode 100644
index 00000000000..e7cf76fcc5c
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/**
+ * Exception that occurs during transformer execution
+ */
+public class HoodieTransformExecutionException extends
HoodieTransformException {
+ public HoodieTransformExecutionException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
new file mode 100644
index 00000000000..15895d3c9d6
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/**
+ * Exception that occurs during planning of transformation
+ */
+public class HoodieTransformPlanException extends HoodieTransformException {
+
+ public HoodieTransformPlanException(String msg) {
+ super(msg);
+ }
+
+ public HoodieTransformPlanException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 9956e1528a4..92666197ca6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import org.apache.avro.Schema;
@@ -78,7 +78,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
schemaStr = FileIOUtils.readAsUTFString(in);
} catch (IOException ioe) {
- throw new HoodieIOException(String.format("Error reading schema from
file %s", schemaPath), ioe);
+ throw new HoodieSchemaProviderException(String.format("Error reading
schema from file %s", schemaPath), ioe);
}
return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema,
invalidCharMask);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index e6b113b2782..dd8b4de944c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -23,7 +23,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
-import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
@@ -59,7 +59,7 @@ public class HiveSchemaProvider extends SchemaProvider {
sourceSchemaTableName,
"hoodie." + sourceSchemaDatabaseName);
} catch (NoSuchTableException | NoSuchDatabaseException e) {
- throw new HoodieSchemaProviderException(String.format("Can't find Hive
table: %s.%s", sourceSchemaDatabaseName, sourceSchemaTableName), e);
+ throw new HoodieSchemaFetchException(String.format("Can't find Hive
table: %s.%s", sourceSchemaDatabaseName, sourceSchemaTableName), e);
}
// target schema
@@ -74,7 +74,7 @@ public class HiveSchemaProvider extends SchemaProvider {
targetSchemaTableName,
"hoodie." + targetSchemaDatabaseName);
} catch (NoSuchDatabaseException | NoSuchTableException e) {
- throw new HoodieSchemaProviderException(String.format("Can't find Hive
table: %s.%s", targetSchemaDatabaseName, targetSchemaTableName), e);
+ throw new HoodieSchemaFetchException(String.format("Can't find Hive
table: %s.%s", targetSchemaDatabaseName, targetSchemaTableName), e);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
index c548db3650f..722d2da551a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
@@ -19,7 +19,6 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JdbcbasedSchemaProviderConfig;
@@ -55,11 +54,6 @@ public class JdbcbasedSchemaProvider extends SchemaProvider {
return sourceSchema;
}
- try {
- sourceSchema = UtilHelpers.getJDBCSchema(options);
- } catch (Exception e) {
- throw new HoodieException("Failed to get Schema through jdbc. ", e);
- }
- return sourceSchema;
+ return UtilHelpers.getJDBCSchema(options);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index dce1db3afc1..5b0e39bc018 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
import org.apache.avro.Schema;
@@ -59,13 +60,18 @@ public class KafkaOffsetPostProcessor extends
SchemaPostProcessor {
@Override
public Schema processSchema(Schema schema) {
// this method adds kafka offset fields namely source offset, partition
and timestamp to the schema of the batch.
- List<Schema.Field> fieldList = schema.getFields();
- List<Schema.Field> newFieldList = fieldList.stream()
- .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal())).collect(Collectors.toList());
- newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN,
Schema.create(Schema.Type.LONG), "offset column", 0));
- newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN,
Schema.create(Schema.Type.INT), "partition column", 0));
- newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN,
Schema.create(Schema.Type.LONG), "timestamp column", 0));
- Schema newSchema = Schema.createRecord(schema.getName() + "_processed",
schema.getDoc(), schema.getNamespace(), false, newFieldList);
- return newSchema;
+ try {
+ List<Schema.Field> fieldList = schema.getFields();
+ List<Schema.Field> newFieldList = fieldList.stream()
+ .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal())).collect(Collectors.toList());
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN,
Schema.create(Schema.Type.LONG), "offset column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN,
Schema.create(Schema.Type.INT), "partition column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN,
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+ Schema newSchema = Schema.createRecord(schema.getName() + "_processed",
schema.getDoc(), schema.getNamespace(), false, newFieldList);
+ return newSchema;
+ } catch (Exception e) {
+ throw new HoodieSchemaException("Kafka offset post processor failed with
schema: " + schema, e);
+ }
+
}
}
\ No newline at end of file
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index 92eca1f6a89..ac7eb811c43 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -84,8 +84,13 @@ public class ProtoClassBasedSchemaProvider extends
SchemaProvider {
@Override
public Schema getSourceSchema() {
if (schema == null) {
- Schema.Parser parser = new Schema.Parser();
- schema = parser.parse(schemaString);
+ try {
+ Schema.Parser parser = new Schema.Parser();
+ schema = parser.parse(schemaString);
+ } catch (Exception e) {
+ throw new HoodieSchemaException("Failed to parse schema: " +
schemaString, e);
+ }
+
}
return schema;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 8fe8e6c334a..29504c01c00 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -44,7 +44,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
- return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct,
HOODIE_RECORD_STRUCT_NAME,
- HOODIE_RECORD_NAMESPACE);
+ return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index b26be1b421b..ca779260093 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -21,8 +21,9 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -89,12 +90,16 @@ public class SchemaRegistryProvider extends SchemaProvider {
String convert(String schema) throws IOException;
}
- public Schema parseSchemaFromRegistry(String registryUrl) throws IOException
{
+ public Schema parseSchemaFromRegistry(String registryUrl) {
String schema = fetchSchemaFromRegistry(registryUrl);
- SchemaConverter converter =
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
- ?
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
- : s -> s;
- return new Schema.Parser().parse(converter.convert(schema));
+ try {
+ SchemaConverter converter =
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
+ ?
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
+ : s -> s;
+ return new Schema.Parser().parse(converter.convert(schema));
+ } catch (Exception e) {
+ throw new HoodieSchemaException("Failed to parse schema from registry: "
+ schema, e);
+ }
}
/**
@@ -105,22 +110,25 @@ public class SchemaRegistryProvider extends
SchemaProvider {
*
* @param registryUrl
* @return the Schema in String form.
- * @throws IOException
*/
- public String fetchSchemaFromRegistry(String registryUrl) throws IOException
{
- HttpURLConnection connection;
- Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
- if (matcher.find()) {
- String creds = matcher.group(1);
- String urlWithoutCreds = registryUrl.replace(creds + "@", "");
- connection = getConnection(urlWithoutCreds);
- setAuthorizationHeader(matcher.group(1), connection);
- } else {
- connection = getConnection(registryUrl);
+ public String fetchSchemaFromRegistry(String registryUrl) {
+ try {
+ HttpURLConnection connection;
+ Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
+ if (matcher.find()) {
+ String creds = matcher.group(1);
+ String urlWithoutCreds = registryUrl.replace(creds + "@", "");
+ connection = getConnection(urlWithoutCreds);
+ setAuthorizationHeader(matcher.group(1), connection);
+ } else {
+ connection = getConnection(registryUrl);
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(getStream(connection));
+ return node.get("schema").asText();
+ } catch (Exception e) {
+ throw new HoodieSchemaFetchException("Failed to fetch schema from
registry", e);
}
- ObjectMapper mapper = new ObjectMapper();
- JsonNode node = mapper.readTree(getStream(connection));
- return node.get("schema").asText();
}
private SSLSocketFactory sslSocketFactory;
@@ -182,8 +190,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
String registryUrl =
config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
try {
return parseSchemaFromRegistry(registryUrl);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error reading source schema from registry
:" + registryUrl, ioe);
+ } catch (Exception e) {
+ throw new HoodieSchemaFetchException("Error reading source schema from
registry :" + registryUrl, e);
}
}
@@ -193,8 +201,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
String targetRegistryUrl =
config.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(),
registryUrl);
try {
return parseSchemaFromRegistry(targetRegistryUrl);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error reading target schema from registry
:" + registryUrl, ioe);
+ } catch (Exception e) {
+ throw new HoodieSchemaFetchException("Error reading target schema from
registry :" + targetRegistryUrl, e);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index fd195621fd7..467b77c5adb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -20,10 +20,9 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -71,14 +70,14 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName).getName());
if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
- throw new HoodieIOException("SchemaProvider has to be set to use
KafkaAvroSchemaDeserializer");
+ throw new HoodieReadFromSourceException("SchemaProvider has to be
set to use KafkaAvroSchemaDeserializer");
}
props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA,
schemaProvider.getSourceSchema().toString());
}
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " +
deserializerClassName;
LOG.error(error);
- throw new HoodieException(error, e);
+ throw new HoodieReadFromSourceException(error, e);
}
this.offsetGen = new KafkaOffsetGen(props);
}
@@ -88,7 +87,7 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
if (schemaProvider == null) {
- throw new HoodieException("Please provide a valid schema provider
class when use ByteArrayDeserializer!");
+ throw new HoodieReadFromSourceException("Please provide a valid schema
provider class when use ByteArrayDeserializer!");
}
//Don't want kafka offsets here so we use originalSchemaProvider
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index 93fe9fa4cfe..20538f9f944 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
@@ -127,8 +128,14 @@ public class GcsEventsSource extends RowSource {
@Override
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
LOG.info("fetchNextBatch(): Input checkpoint: " + lastCkptStr);
-
- MessageBatch messageBatch = fetchFileMetadata();
+ MessageBatch messageBatch;
+ try {
+ messageBatch = fetchFileMetadata();
+ } catch (HoodieException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HoodieReadFromSourceException("Failed to fetch file metadata
from GCS events source", e);
+ }
if (messageBatch.isEmpty()) {
LOG.info("No new data. Returning empty batch with checkpoint value: " +
CHECKPOINT_VALUE_ZERO);
@@ -197,7 +204,7 @@ public class GcsEventsSource extends RowSource {
pubsubMessagesFetcher.sendAcks(messagesToAck);
messagesToAck.clear();
} catch (IOException e) {
- throw new HoodieException("Error when acknowledging messages from
Pubsub", e);
+ throw new HoodieReadFromSourceException("Error when acknowledging
messages from Pubsub", e);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index 6083046eef1..529aa0dd655 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -22,9 +22,9 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.config.HiveIncrPullSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
@@ -132,8 +132,8 @@ public class HiveIncrPullSource extends AvroSource {
sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new
data");
return new InputBatch<>(Option.of(avroRDD.keys().map(r ->
((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
- } catch (IOException ioe) {
- throw new HoodieIOException("Unable to read from source from checkpoint:
" + lastCheckpointStr, ioe);
+ } catch (Exception e) {
+ throw new HoodieReadFromSourceException("Unable to read from source from
checkpoint: " + lastCheckpointStr, e);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
index ae0221c8095..92084e0733f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.SqlQueryBuilder;
import org.apache.hudi.utilities.config.JdbcSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -256,7 +257,7 @@ public class JdbcSource extends RowSource {
}
} catch (Exception e) {
LOG.error("Failed to checkpoint");
- throw new HoodieException("Failed to checkpoint. Last checkpoint: " +
lastCkptStr.orElse(null), e);
+ throw new HoodieReadFromSourceException("Failed to checkpoint. Last
checkpoint: " + lastCkptStr.orElse(null), e);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1f6653e9655..7c90f7a0daa 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -21,8 +21,8 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
@@ -59,7 +59,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
className =
props.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
this.offsetGen = new KafkaOffsetGen(props);
if (this.shouldAddOffsets) {
- throw new HoodieException("Appending kafka offsets to ProtoKafkaSource
is not supported");
+ throw new HoodieReadFromSourceException("Appending kafka offsets to
ProtoKafkaSource is not supported");
}
}
@@ -83,7 +83,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
try {
return (Message) getParseMethod().invoke(getClass(), bytes);
} catch (IllegalAccessException | InvocationTargetException ex) {
- throw new HoodieException("Failed to parse proto message from kafka",
ex);
+ throw new HoodieReadFromSourceException("Failed to parse proto message
from kafka", ex);
}
}
@@ -99,7 +99,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
try {
parseMethod = getProtoClass().getMethod("parseFrom", byte[].class);
} catch (NoSuchMethodException ex) {
- throw new HoodieException("Unable to get proto parsing method from
specified class: " + className, ex);
+ throw new HoodieReadFromSourceException("Unable to get proto parsing
method from specified class: " + className, ex);
}
}
return parseMethod;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
index b1686c3fbe4..2a75247dc17 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.Lazy;
import org.apache.hudi.utilities.config.PulsarSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.pulsar.client.api.Consumer;
@@ -179,7 +180,7 @@ public class PulsarSource extends RowSource implements
Closeable {
pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
} catch (PulsarClientException e) {
LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'",
latestConsumedOffset, topicName), e);
- throw new HoodieIOException("Failed to ack message for topic", e);
+ throw new HoodieReadFromSourceException("Failed to ack message for
topic", e);
}
}
@@ -188,7 +189,7 @@ public class PulsarSource extends RowSource implements
Closeable {
return pulsarConsumer.get().getLastMessageId();
} catch (PulsarClientException e) {
LOG.error(String.format("Failed to fetch latest messageId for topic
'%s'", topicName), e);
- throw new HoodieIOException("Failed to fetch latest messageId for
topic", e);
+ throw new HoodieReadFromSourceException("Failed to fetch latest
messageId for topic", e);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 20454bd80ef..efdbbc06a4f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -23,9 +23,9 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -53,7 +53,6 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -92,7 +91,7 @@ public abstract class DebeziumSource extends RowSource {
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " +
deserializerClassName;
LOG.error(error);
- throw new HoodieException(error, e);
+ throw new HoodieReadFromSourceException(error, e);
}
// Currently, debezium source requires Confluent/Kafka schema-registry to
fetch the latest schema.
@@ -125,9 +124,9 @@ public abstract class DebeziumSource extends RowSource {
LOG.info(String.format("Spark schema of Kafka Payload for topic
%s:\n%s", offsetGen.getTopicName(), dataset.schema().treeString()));
LOG.info(String.format("New checkpoint string: %s",
CheckpointUtils.offsetsToStr(offsetRanges)));
return Pair.of(Option.of(dataset), overrideCheckpointStr.isEmpty() ?
CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
- } catch (IOException exc) {
- LOG.error("Fatal error reading and parsing incoming debezium event",
exc);
- throw new HoodieException("Fatal error reading and parsing incoming
debezium event", exc);
+ } catch (Exception e) {
+ LOG.error("Fatal error reading and parsing incoming debezium event",
e);
+ throw new HoodieReadFromSourceException("Fatal error reading and
parsing incoming debezium event", e);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 65aa7346c7d..84a50b24a28 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import com.google.protobuf.Message;
import com.twitter.bijection.Injection;
@@ -29,6 +30,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
+import java.util.Arrays;
import scala.util.Either;
import scala.util.Left;
@@ -108,9 +110,17 @@ public class AvroConvertor implements Serializable {
}
public GenericRecord fromJson(String json) {
- initSchema();
- initJsonConvertor();
- return jsonConverter.convert(json, schema);
+ try {
+ initSchema();
+ initJsonConvertor();
+ return jsonConverter.convert(json, schema);
+ } catch (Exception e) {
+ if (json != null) {
+ throw new HoodieSchemaException("Failed to convert schema from json to
avro: " + json, e);
+ } else {
+ throw new HoodieSchemaException("Failed to convert schema from json to
avro. Schema string was null.", e);
+ }
+ }
}
public Either<GenericRecord,String> fromJsonWithError(String json) {
@@ -124,18 +134,35 @@ public class AvroConvertor implements Serializable {
}
public Schema getSchema() {
- return new Schema.Parser().parse(schemaStr);
+ try {
+ return new Schema.Parser().parse(schemaStr);
+ } catch (Exception e) {
+ throw new HoodieSchemaException("Failed to parse json schema: " +
schemaStr, e);
+ }
}
public GenericRecord fromAvroBinary(byte[] avroBinary) {
- initSchema();
- initInjection();
- return recordInjection.invert(avroBinary).get();
+ try {
+ initSchema();
+ initInjection();
+ return recordInjection.invert(avroBinary).get();
+ } catch (Exception e) {
+ if (avroBinary != null) {
+ throw new HoodieSchemaException("Failed to get avro schema from avro
binary: " + Arrays.toString(avroBinary), e);
+ } else {
+ throw new HoodieSchemaException("Failed to get avro schema from avro
binary. Binary is null", e);
+ }
+ }
+
}
public GenericRecord fromProtoMessage(Message message) {
- initSchema();
- return ProtoConversionUtil.convertToAvro(schema, message);
+ try {
+ initSchema();
+ return ProtoConversionUtil.convertToAvro(schema, message);
+ } catch (Exception e) {
+ throw new HoodieSchemaException("Failed to get avro schema from proto
message", e);
+ }
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index cbc66ff4be7..8945802355d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -72,7 +72,7 @@ public class ChainedTransformer implements Transformer {
} else {
String[] splits =
configuredTransformer.split(ID_TRANSFORMER_CLASS_NAME_DELIMITER);
if (splits.length > 2) {
- throw new IllegalArgumentException("There should only be one colon
in a configured transformer");
+ throw new HoodieTransformPlanException("There should only be one
colon in a configured transformer");
}
String id = splits[0];
validateIdentifier(id, identifiers, configuredTransformer);
@@ -80,10 +80,10 @@ public class ChainedTransformer implements Transformer {
transformers.add(new TransformerInfo(transformer, id));
}
}
-
-
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
- || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
- "Either all transformers should have identifier or none should");
+ if (!(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+ || transformers.stream().noneMatch(TransformerInfo::hasIdentifier))) {
+ throw new HoodieTransformPlanException("Either all transformers should
have identifier or none should");
+ }
}
public List<String> getTransformersNames() {
@@ -101,9 +101,11 @@ public class ChainedTransformer implements Transformer {
}
private void validateIdentifier(String id, Set<String> identifiers, String
configuredTransformer) {
- ValidationUtils.checkArgument(StringUtils.nonEmpty(id),
String.format("Transformer identifier is empty for %s", configuredTransformer));
+ if (StringUtils.isNullOrEmpty(id)) {
+ throw new HoodieTransformPlanException(String.format("Transformer
identifier is empty for %s", configuredTransformer));
+ }
if (identifiers.contains(id)) {
- throw new IllegalArgumentException(String.format("Duplicate identifier
%s found for transformer %s", id, configuredTransformer));
+ throw new HoodieTransformPlanException(String.format("Duplicate
identifier %s found for transformer %s", id, configuredTransformer));
} else {
identifiers.add(id);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index af5d3a44b06..1256491528d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -45,14 +46,17 @@ public class FlatteningTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
TypedProperties properties) {
-
- // tmp table name doesn't like dashes
- String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table : " + tmpTable);
- rowDataset.createOrReplaceTempView(tmpTable);
- Dataset<Row> transformed = sparkSession.sql("select " +
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
- sparkSession.catalog().dropTempView(tmpTable);
- return transformed;
+ try {
+ // tmp table name doesn't like dashes
+ String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+ LOG.info("Registering tmp table : " + tmpTable);
+ rowDataset.createOrReplaceTempView(tmpTable);
+ Dataset<Row> transformed = sparkSession.sql("select " +
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
+ sparkSession.catalog().dropTempView(tmpTable);
+ return transformed;
+ } catch (Exception e) {
+ throw new HoodieTransformExecutionException("Failed to apply flattening
transformer", e);
+ }
}
public String flattenSchema(StructType schema, String prefix) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
index 2a58c498b46..98f1ce482b8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -20,8 +20,9 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -70,7 +71,7 @@ public class SqlFileBasedTransformer implements Transformer {
final String sqlFile =
props.getString(SqlTransformerConfig.TRANSFORMER_SQL_FILE.key());
if (null == sqlFile) {
- throw new IllegalArgumentException(
+ throw new HoodieTransformException(
"Missing required configuration : (" +
SqlTransformerConfig.TRANSFORMER_SQL_FILE.key() + ")");
}
@@ -96,7 +97,7 @@ public class SqlFileBasedTransformer implements Transformer {
}
return rows;
} catch (final IOException ioe) {
- throw new HoodieIOException("Error reading transformer SQL file.", ioe);
+ throw new HoodieTransformExecutionException("Error reading transformer
SQL file.", ioe);
} finally {
sparkSession.catalog().dropTempView(tmpTable);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index aaabe3dce5a..c9b410121c7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -20,6 +20,8 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -47,17 +49,21 @@ public class SqlQueryBasedTransformer implements
Transformer {
TypedProperties properties) {
String transformerSQL =
properties.getString(SqlTransformerConfig.TRANSFORMER_SQL.key());
if (null == transformerSQL) {
- throw new IllegalArgumentException("Missing configuration : (" +
SqlTransformerConfig.TRANSFORMER_SQL.key() + ")");
+ throw new HoodieTransformException("Missing configuration : (" +
SqlTransformerConfig.TRANSFORMER_SQL.key() + ")");
}
- // tmp table name doesn't like dashes
- String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table : " + tmpTable);
- rowDataset.createOrReplaceTempView(tmpTable);
- String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
- LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
- Dataset<Row> transformed = sparkSession.sql(sqlStr);
- sparkSession.catalog().dropTempView(tmpTable);
- return transformed;
+ try {
+ // tmp table name doesn't like dashes
+ String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+ LOG.info("Registering tmp table : " + tmpTable);
+ rowDataset.createOrReplaceTempView(tmpTable);
+ String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
+ LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
+ Dataset<Row> transformed = sparkSession.sql(sqlStr);
+ sparkSession.catalog().dropTempView(tmpTable);
+ return transformed;
+ } catch (Exception e) {
+ throw new HoodieTransformExecutionException("Failed to apply sql query
based transformer", e);
+ }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f6b70356ec2..77da8bfdb12 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -57,6 +57,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
@@ -2415,7 +2416,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
//No change as this fails with Path not exist error
- assertThrows(org.apache.spark.sql.AnalysisException.class, () -> new
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+ assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
if (downstreamCfg.configs == null) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
index 6c3caf72693..4a47cd3c64b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -19,7 +19,7 @@
package org.apache.hudi.utilities.deltastreamer.multisync;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -61,7 +61,7 @@ public class TestMultipleMetaSync extends
HoodieDeltaStreamerTestBase {
MockSyncTool1.syncSuccess = false;
MockSyncTool2.syncSuccess = false;
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames);
- Exception e = assertThrows(HoodieException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+ Exception e = assertThrows(HoodieMetaSyncException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
assertTrue(MockSyncTool1.syncSuccess);
assertTrue(MockSyncTool2.syncSuccess);
@@ -73,7 +73,7 @@ public class TestMultipleMetaSync extends
HoodieDeltaStreamerTestBase {
MockSyncTool1.syncSuccess = false;
MockSyncTool2.syncSuccess = false;
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath,
getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1",
"MockSyncToolException2"));
- Exception e = assertThrows(HoodieException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+ Exception e = assertThrows(HoodieMetaSyncException.class, () -> new
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName()));
assertTrue(MockSyncTool1.syncSuccess);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
index 26768324946..80a2f8bc6e4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
@@ -20,6 +20,7 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -87,7 +88,7 @@ public class TestChainedTransformer extends
SparkClientFunctionalTestHarness {
ChainedTransformer transformer = new
ChainedTransformer(Arrays.asList(transformerName.split(",")));
fail();
} catch (Exception e) {
- assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+ assertTrue(e instanceof HoodieTransformPlanException, e.getMessage());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
index d86ce15113f..e9f0ee7ebea 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.sql.Column;
@@ -132,7 +133,7 @@ public class TestErrorTableAwareChainedTransformer extends
SparkClientFunctional
ErrorTableAwareChainedTransformer transformer = new
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
fail();
} catch (Exception e) {
- assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+ assertTrue(e instanceof HoodieTransformException, e.getMessage());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index fadaa3fd217..13816351849 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -44,7 +44,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import java.io.IOException;
import java.util.UUID;
import java.util.stream.Stream;
@@ -170,7 +169,7 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
}
@Override
- public String fetchSchemaFromRegistry(String registryUrl) throws
IOException {
+ public String fetchSchemaFromRegistry(String registryUrl) {
return schema;
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 1c9152755ca..7f9fe7e1e0e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -20,7 +20,7 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Dataset;
@@ -101,7 +101,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
"hoodie.deltastreamer.transformer.sql.file",
UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
assertThrows(
- HoodieIOException.class,
+ HoodieTransformException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows,
props));
}