This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 31192242cea [HUDI-6261] Make deltastreamer exceptions more descriptive 
(#8638)
31192242cea is described below

commit 31192242cea06c08ac00f537cb87fe547fe18ec9
Author: Jon Vexler <[email protected]>
AuthorDate: Sat May 27 14:25:04 2023 -0400

    [HUDI-6261] Make deltastreamer exceptions more descriptive (#8638)
---
 .../org/apache/hudi/AvroConversionUtils.scala      | 21 +++++---
 .../HoodieIncrementalPathNotFoundException.java    | 33 +++++++++++++
 .../hudi/exception/HoodieMetaSyncException.java    | 39 +++++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  9 ++--
 .../org/apache/hudi/IncrementalRelation.scala      | 32 ++++++++-----
 .../TestIncrementalReadWithFullTableScan.scala     |  3 +-
 .../hudi/sync/common/util/SyncUtilHelpers.java     | 23 +++++----
 .../org/apache/hudi/utilities/UtilHelpers.java     | 35 +++++++++-----
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 33 +++++++------
 .../HoodieDeltaStreamerWriteException.java         | 30 ++++++++++++
 .../exception/HoodieReadFromSourceException.java   | 32 +++++++++++++
 .../exception/HoodieSchemaFetchException.java      | 33 +++++++++++++
 .../exception/HoodieTransformException.java        | 34 +++++++++++++
 .../HoodieTransformExecutionException.java         | 28 +++++++++++
 .../exception/HoodieTransformPlanException.java    | 33 +++++++++++++
 .../utilities/schema/FilebasedSchemaProvider.java  |  4 +-
 .../hudi/utilities/schema/HiveSchemaProvider.java  |  6 +--
 .../utilities/schema/JdbcbasedSchemaProvider.java  |  8 +---
 .../utilities/schema/KafkaOffsetPostProcessor.java | 22 +++++----
 .../schema/ProtoClassBasedSchemaProvider.java      |  9 +++-
 .../utilities/schema/RowBasedSchemaProvider.java   |  3 +-
 .../utilities/schema/SchemaRegistryProvider.java   | 56 ++++++++++++----------
 .../hudi/utilities/sources/AvroKafkaSource.java    |  9 ++--
 .../hudi/utilities/sources/GcsEventsSource.java    | 13 +++--
 .../hudi/utilities/sources/HiveIncrPullSource.java |  6 +--
 .../apache/hudi/utilities/sources/JdbcSource.java  |  3 +-
 .../hudi/utilities/sources/ProtoKafkaSource.java   |  8 ++--
 .../hudi/utilities/sources/PulsarSource.java       |  5 +-
 .../utilities/sources/debezium/DebeziumSource.java | 11 ++---
 .../utilities/sources/helpers/AvroConvertor.java   | 45 +++++++++++++----
 .../utilities/transform/ChainedTransformer.java    | 18 +++----
 .../utilities/transform/FlatteningTransformer.java | 20 ++++----
 .../transform/SqlFileBasedTransformer.java         |  7 +--
 .../transform/SqlQueryBasedTransformer.java        | 26 ++++++----
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  3 +-
 .../multisync/TestMultipleMetaSync.java            |  6 +--
 .../functional/TestChainedTransformer.java         |  3 +-
 .../TestErrorTableAwareChainedTransformer.java     |  3 +-
 .../debezium/TestAbstractDebeziumSource.java       |  3 +-
 .../transform/TestSqlFileBasedTransformer.java     |  4 +-
 40 files changed, 541 insertions(+), 178 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 8e44dc4b1fe..be86cd37df9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.avro.{JsonProperties, Schema}
 import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.avro.AvroSchemaUtils
+import org.apache.hudi.internal.schema.HoodieSchemaException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -138,18 +139,26 @@ object AvroConversionUtils {
   def convertStructTypeToAvroSchema(structType: DataType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    val avroSchema = schemaConverters.toAvroType(structType, nullable = false, 
structName, recordNamespace)
-    getAvroSchemaWithDefaults(avroSchema, structType)
+    try {
+      val schemaConverters = sparkAdapter.getAvroSchemaConverters
+      val avroSchema = schemaConverters.toAvroType(structType, nullable = 
false, structName, recordNamespace)
+      getAvroSchemaWithDefaults(avroSchema, structType)
+    } catch {
+      case e: Exception => throw new HoodieSchemaException("Failed to convert 
struct type to avro schema: " + structType, e)
+    }
   }
 
   /**
    * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
    */
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    schemaConverters.toSqlType(avroSchema) match {
-      case (dataType, _) => dataType.asInstanceOf[StructType]
+    try {
+      val schemaConverters = sparkAdapter.getAvroSchemaConverters
+      schemaConverters.toSqlType(avroSchema) match {
+        case (dataType, _) => dataType.asInstanceOf[StructType]
+      }
+    } catch {
+      case e: Exception => throw new HoodieSchemaException("Failed to convert 
avro schema to struct type: " + avroSchema, e)
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
new file mode 100644
index 00000000000..cce29f96050
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncrementalPathNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Thrown when path is not found in incremental query.
+ * see [HUDI-2711] for more details on the issue
+ */
+public class HoodieIncrementalPathNotFoundException extends HoodieException {
+
+  public HoodieIncrementalPathNotFoundException(Throwable e) {
+    super("Path not found during incremental query. It is likely that the 
underlying file has been "
+        + "moved or deleted by the cleaner. Consider setting "
+        + "hoodie.datasource.read.incr.fallback.fulltablescan.enable to 
true.", e);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
new file mode 100644
index 00000000000..ba9e51c2ee1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieMetaSyncException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import java.util.Map;
+
+public class HoodieMetaSyncException extends HoodieException {
+
+  private Map<String,HoodieException> failedMetaSyncs;
+
+  public Map<String,HoodieException> getFailedMetaSyncs() {
+    return failedMetaSyncs;
+  }
+
+  public HoodieMetaSyncException(String msg, Throwable e) {
+    super(msg, e);
+  }
+
+  public HoodieMetaSyncException(String msg, Map<String,HoodieException> 
failedMetaSyncs) {
+    super(msg);
+    this.failedMetaSyncs = failedMetaSyncs;
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2a2e0d59e7d..f353f78b063 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -59,6 +59,7 @@ import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
SparkKeyGeneratorI
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
+import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.table.BulkInsertPartitioner
 import org.apache.hudi.util.SparkKeyGenUtils
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@@ -907,18 +908,18 @@ object HoodieSparkSqlWriter {
       
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, 
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
 
       // Collect exceptions in list because we want all sync to run. Then we 
can throw
-      val metaSyncExceptions = new ListBuffer[HoodieException]()
+      val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
       syncClientToolClassSet.foreach(impl => {
         try {
           SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, 
fs, basePath.toString, baseFileFormat)
         } catch {
           case e: HoodieException =>
             log.info("SyncTool class " + impl.trim + " failed with exception", 
e)
-            metaSyncExceptions.add(e)
+            failedMetaSyncs.put(impl, e)
         }
       })
-      if (metaSyncExceptions.nonEmpty) {
-        throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions)
+      if (failedMetaSyncs.nonEmpty) {
+        throw getHoodieMetaSyncException(failedMetaSyncs)
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 5624120ae0d..b81a749f613 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, 
HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.{HoodieTimer, InternalSchemaCache}
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.exception.{HoodieException, 
HoodieIncrementalPathNotFoundException}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.table.HoodieSparkTable
@@ -37,7 +37,7 @@ import org.apache.spark.rdd.RDD
 import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
@@ -261,15 +261,25 @@ class IncrementalRelation(val sqlContext: SQLContext,
             }
 
             if (regularFileIdToFullPath.nonEmpty) {
-              df = df.union(sqlContext.read.options(sOpts)
-                .schema(usedSchema).format(formatClassName)
-                // Setting time to the END_INSTANT_TIME, to avoid pathFilter 
filter out files incorrectly.
-                .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), 
endInstantTime)
-                .load(filteredRegularFullPaths.toList: _*)
-                .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                  commitsToReturn.head.getTimestamp))
-                .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                  commitsToReturn.last.getTimestamp)))
+              try {
+                df = df.union(sqlContext.read.options(sOpts)
+                  .schema(usedSchema).format(formatClassName)
+                  // Setting time to the END_INSTANT_TIME, to avoid pathFilter 
filter out files incorrectly.
+                  
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
+                  .load(filteredRegularFullPaths.toList: _*)
+                  .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    commitsToReturn.head.getTimestamp))
+                  .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    commitsToReturn.last.getTimestamp)))
+              } catch {
+                case e : AnalysisException =>
+                  if (e.getMessage.contains("Path does not exist")) {
+                    throw new HoodieIncrementalPathNotFoundException(e)
+                  } else {
+                    throw e
+                  }
+              }
+
             }
             df
           }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 45b9cb4c902..1d89f105331 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN
 import org.apache.hudi.common.table.timeline.{HoodieInstant, 
HoodieInstantTimeGenerator, HoodieTimeline}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.spark.SparkException
@@ -174,7 +175,7 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
     val msg = "Should fail with Path does not exist"
     tableType match {
       case HoodieTableType.COPY_ON_WRITE =>
-        assertThrows(classOf[AnalysisException], new Executable {
+        assertThrows(classOf[HoodieIncrementalPathNotFoundException], new 
Executable {
           override def execute(): Unit = {
             fn()
           }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index a1c63da989b..036ca3b47a9 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -22,6 +22,7 @@ package org.apache.hudi.sync.common.util;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncTool;
 
@@ -30,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -58,7 +59,7 @@ public class SyncUtilHelpers {
     try (HoodieSyncTool syncTool = instantiateMetaSyncTool(syncToolClassName, 
props, hadoopConfig, fs, targetBasePath, baseFileFormat)) {
       syncTool.syncHoodieTable();
     } catch (Throwable e) {
-      throw new HoodieException("Could not sync using the meta sync class " + 
syncToolClassName, e);
+      throw new HoodieMetaSyncException("Could not sync using the meta sync 
class " + syncToolClassName, e);
     }
   }
 
@@ -106,16 +107,18 @@ public class SyncUtilHelpers {
     }
   }
 
-  public static HoodieException 
getExceptionFromList(Collection<HoodieException> exceptions) {
-    if (exceptions.size() == 1) {
-      return exceptions.stream().findFirst().get();
+  public static HoodieException 
getHoodieMetaSyncException(Map<String,HoodieException> failedMetaSyncs) {
+    if (failedMetaSyncs.size() == 1) {
+      return failedMetaSyncs.values().stream().findFirst().get();
     }
     StringBuilder sb = new StringBuilder();
-    sb.append("Multiple exceptions during meta sync:\n");
-    exceptions.forEach(e -> {
-      sb.append(e.getMessage());
+    sb.append("MetaSyncs failed: {");
+    sb.append(String.join(",", failedMetaSyncs.keySet()));
+    sb.append("}\n");
+    for (String impl : failedMetaSyncs.keySet()) {
+      sb.append(failedMetaSyncs.get(impl).getMessage());
       sb.append("\n");
-    });
-    return new HoodieException(sb.toString());
+    }
+    return new HoodieMetaSyncException(sb.toString(),failedMetaSyncs);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 16ed7eadc1f..0805c42698f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -47,6 +47,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -432,13 +433,11 @@ public class UtilHelpers {
   /**
    * Returns true if the table already exists in the JDBC database.
    */
-  private static Boolean tableExists(Connection conn, Map<String, String> 
options) {
+  private static Boolean tableExists(Connection conn, Map<String, String> 
options) throws SQLException {
     JdbcDialect dialect = 
JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
     try (PreparedStatement statement = 
conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME()))))
 {
       
statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
       statement.executeQuery();
-    } catch (SQLException e) {
-      throw new HoodieException(e);
     }
     return true;
   }
@@ -450,13 +449,25 @@ public class UtilHelpers {
    * @return
    * @throws Exception
    */
-  public static Schema getJDBCSchema(Map<String, String> options) throws 
Exception {
-    Connection conn = createConnectionFactory(options);
-    String url = options.get(JDBCOptions.JDBC_URL());
-    String table = options.get(JDBCOptions.JDBC_TABLE_NAME());
-    boolean tableExists = tableExists(conn, options);
+  public static Schema getJDBCSchema(Map<String, String> options) {
+    Connection conn;
+    String url;
+    String table;
+    boolean tableExists;
+    try {
+      conn = createConnectionFactory(options);
+      url = options.get(JDBCOptions.JDBC_URL());
+      table = options.get(JDBCOptions.JDBC_TABLE_NAME());
+      tableExists = tableExists(conn, options);
+    } catch (Exception e) {
+      throw new HoodieSchemaFetchException("Failed to connect to jdbc", e);
+    }
 
-    if (tableExists) {
+    if (!tableExists) {
+      throw new HoodieSchemaFetchException(String.format("%s table does not 
exists!", table));
+    }
+
+    try {
       JdbcDialect dialect = JdbcDialects.get(url);
       try (PreparedStatement statement = 
conn.prepareStatement(dialect.getSchemaQuery(table))) {
         
statement.setQueryTimeout(Integer.parseInt(options.get("queryTimeout")));
@@ -470,8 +481,10 @@ public class UtilHelpers {
           return AvroConversionUtils.convertStructTypeToAvroSchema(structType, 
table, "hoodie." + table);
         }
       }
-    } else {
-      throw new HoodieException(String.format("%s table does not exists!", 
table));
+    } catch (HoodieException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HoodieSchemaFetchException(String.format("Unable to fetch 
schema from %s table", table), e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 936320a648e..8a0b4050243 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -67,6 +67,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.internal.schema.InternalSchema;
@@ -86,6 +87,8 @@ import 
org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
 import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.utilities.exception.HoodieDeltaStreamerWriteException;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
@@ -153,6 +156,7 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
 import static 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
+import static 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static 
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
@@ -868,12 +872,12 @@ public class DeltaSync implements Serializable, Closeable 
{
             case ROLLBACK_COMMIT:
               LOG.info("Commit " + instantTime + " failed!");
               writeClient.rollback(instantTime);
-              throw new HoodieException("Error Table Commit failed!");
+              throw new HoodieDeltaStreamerWriteException("Error table commit 
failed");
             case LOG_ERROR:
               LOG.error("Error Table write failed for instant " + instantTime);
               break;
             default:
-              throw new HoodieException("Write failure strategy not 
implemented for " + errorWriteFailureStrategy);
+              throw new HoodieDeltaStreamerWriteException("Write failure 
strategy not implemented for " + errorWriteFailureStrategy);
           }
         }
       }
@@ -892,7 +896,7 @@ public class DeltaSync implements Serializable, Closeable {
         }
       } else {
         LOG.info("Commit " + instantTime + " failed!");
-        throw new HoodieException("Commit " + instantTime + " failed!");
+        throw new HoodieDeltaStreamerWriteException("Commit " + instantTime + 
" failed!");
       }
     } else {
       LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
@@ -905,7 +909,7 @@ public class DeltaSync implements Serializable, Closeable {
       });
       // Rolling back instant
       writeClient.rollback(instantTime);
-      throw new HoodieException("Commit " + instantTime + " failed and 
rolled-back !");
+      throw new HoodieDeltaStreamerWriteException("Commit " + instantTime + " 
failed and rolled-back !");
     }
     long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
 
@@ -971,21 +975,20 @@ public class DeltaSync implements Serializable, Closeable 
{
             
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
       }
 
-      //Collect exceptions in list because we want all sync to run. Then we 
can throw
-      List<HoodieException> metaSyncExceptions = new ArrayList<>();
+      Map<String,HoodieException> failedMetaSyncs = new HashMap<>();
       for (String impl : syncClientToolClasses) {
+        Timer.Context syncContext = metrics.getMetaSyncTimerContext();
         try {
-          Timer.Context syncContext = metrics.getMetaSyncTimerContext();
           SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, 
cfg.targetBasePath, cfg.baseFileFormat);
-          long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
-          
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), 
metaSyncTimeMs);
-        } catch (HoodieException e) {
-          LOG.info("SyncTool class " + impl.trim() + " failed with exception", 
e);
-          metaSyncExceptions.add(e);
+        } catch (HoodieMetaSyncException e) {
+          LOG.warn("SyncTool class " + impl.trim() + " failed with exception", 
e);
+          failedMetaSyncs.put(impl, e);
         }
+        long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
+        
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), 
metaSyncTimeMs);
       }
-      if (!metaSyncExceptions.isEmpty()) {
-        throw SyncUtilHelpers.getExceptionFromList(metaSyncExceptions);
+      if (!failedMetaSyncs.isEmpty()) {
+        throw getHoodieMetaSyncException(failedMetaSyncs);
       }
     }
   }
@@ -1131,7 +1134,7 @@ public class DeltaSync implements Serializable, Closeable 
{
       }
       return newWriteSchema;
     } catch (Exception e) {
-      throw new HoodieException("Failed to fetch schema from table ", e);
+      throw new HoodieSchemaFetchException("Failed to fetch schema from 
table", e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
new file mode 100644
index 00000000000..7d57b1a2124
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerWriteException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+public class HoodieDeltaStreamerWriteException extends 
HoodieDeltaStreamerException {
+
+  public  HoodieDeltaStreamerWriteException(String msg) {
+    super(msg);
+  }
+
+  public HoodieDeltaStreamerWriteException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
new file mode 100644
index 00000000000..33c0d0d2dd2
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieReadFromSourceException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieReadFromSourceException extends HoodieException {
+
+  public HoodieReadFromSourceException(String msg) {
+    super(msg);
+  }
+
+  public HoodieReadFromSourceException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
new file mode 100644
index 00000000000..d2f80e068d5
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaFetchException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/*
+ * Thrown when a schema provider is unable to fetch the schema
+ */
+public class HoodieSchemaFetchException extends HoodieSchemaProviderException {
+
+  public HoodieSchemaFetchException(String msg) {
+    super(msg);
+  }
+
+  public HoodieSchemaFetchException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
new file mode 100644
index 00000000000..ecf2c9e770c
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * general exception in transformer
+ */
+public class HoodieTransformException extends HoodieException {
+  public HoodieTransformException(String msg, Throwable e) {
+    super(msg, e);
+  }
+
+  public HoodieTransformException(String msg) {
+    super(msg);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
new file mode 100644
index 00000000000..e7cf76fcc5c
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformExecutionException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/**
+ * Exception that occurs during transformer execution
+ */
+public class HoodieTransformExecutionException extends 
HoodieTransformException {
+  public HoodieTransformExecutionException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
new file mode 100644
index 00000000000..15895d3c9d6
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieTransformPlanException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.exception;
+
+/**
+ * Exception that occurs during planning of transformation
+ */
+public class HoodieTransformPlanException extends HoodieTransformException {
+
+  public HoodieTransformPlanException(String msg) {
+    super(msg);
+  }
+
+  public HoodieTransformPlanException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 9956e1528a4..92666197ca6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import org.apache.avro.Schema;
@@ -78,7 +78,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
       schemaStr = FileIOUtils.readAsUTFString(in);
     } catch (IOException ioe) {
-      throw new HoodieIOException(String.format("Error reading schema from 
file %s", schemaPath), ioe);
+      throw new HoodieSchemaProviderException(String.format("Error reading 
schema from file %s", schemaPath), ioe);
     }
     return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, 
invalidCharMask);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index e6b113b2782..dd8b4de944c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -23,7 +23,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
-import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -59,7 +59,7 @@ public class HiveSchemaProvider extends SchemaProvider {
           sourceSchemaTableName,
           "hoodie." + sourceSchemaDatabaseName);
     } catch (NoSuchTableException | NoSuchDatabaseException e) {
-      throw new HoodieSchemaProviderException(String.format("Can't find Hive 
table: %s.%s", sourceSchemaDatabaseName, sourceSchemaTableName), e);
+      throw new HoodieSchemaFetchException(String.format("Can't find Hive 
table: %s.%s", sourceSchemaDatabaseName, sourceSchemaTableName), e);
     }
 
     // target schema
@@ -74,7 +74,7 @@ public class HiveSchemaProvider extends SchemaProvider {
             targetSchemaTableName,
             "hoodie." + targetSchemaDatabaseName);
       } catch (NoSuchDatabaseException | NoSuchTableException e) {
-        throw new HoodieSchemaProviderException(String.format("Can't find Hive 
table: %s.%s", targetSchemaDatabaseName, targetSchemaTableName), e);
+        throw new HoodieSchemaFetchException(String.format("Can't find Hive 
table: %s.%s", targetSchemaDatabaseName, targetSchemaTableName), e);
       }
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
index c548db3650f..722d2da551a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.JdbcbasedSchemaProviderConfig;
 
@@ -55,11 +54,6 @@ public class JdbcbasedSchemaProvider extends SchemaProvider {
       return sourceSchema;
     }
 
-    try {
-      sourceSchema = UtilHelpers.getJDBCSchema(options);
-    } catch (Exception e) {
-      throw new HoodieException("Failed to get Schema through jdbc. ", e);
-    }
-    return sourceSchema;
+    return UtilHelpers.getJDBCSchema(options);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index dce1db3afc1..5b0e39bc018 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 
 import org.apache.avro.Schema;
@@ -59,13 +60,18 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
   @Override
   public Schema processSchema(Schema schema) {
     // this method adds kafka offset fields namely source offset, partition 
and timestamp to the schema of the batch.
-    List<Schema.Field> fieldList = schema.getFields();
-    List<Schema.Field> newFieldList = fieldList.stream()
-        .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
-    newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
-    Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
-    return newSchema;
+    try {
+      List<Schema.Field> fieldList = schema.getFields();
+      List<Schema.Field> newFieldList = fieldList.stream()
+          .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
+      return newSchema;
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Kafka offset post processor failed with 
schema: " + schema, e);
+    }
+
   }
 }
\ No newline at end of file
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index 92eca1f6a89..ac7eb811c43 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -84,8 +84,13 @@ public class ProtoClassBasedSchemaProvider extends 
SchemaProvider {
   @Override
   public Schema getSourceSchema() {
     if (schema == null) {
-      Schema.Parser parser = new Schema.Parser();
-      schema = parser.parse(schemaString);
+      try {
+        Schema.Parser parser = new Schema.Parser();
+        schema = parser.parse(schemaString);
+      } catch (Exception e) {
+        throw new HoodieSchemaException("Failed to parse schema: " + 
schemaString, e);
+      }
+
     }
     return schema;
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 8fe8e6c334a..29504c01c00 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -44,7 +44,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, 
HOODIE_RECORD_STRUCT_NAME,
-        HOODIE_RECORD_NAMESPACE);
+    return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index b26be1b421b..ca779260093 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -21,8 +21,9 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -89,12 +90,16 @@ public class SchemaRegistryProvider extends SchemaProvider {
     String convert(String schema) throws IOException;
   }
 
-  public Schema parseSchemaFromRegistry(String registryUrl) throws IOException 
{
+  public Schema parseSchemaFromRegistry(String registryUrl) {
     String schema = fetchSchemaFromRegistry(registryUrl);
-    SchemaConverter converter = 
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
-        ? 
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
-        : s -> s;
-    return new Schema.Parser().parse(converter.convert(schema));
+    try {
+      SchemaConverter converter = 
config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
+          ? 
ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
+          : s -> s;
+      return new Schema.Parser().parse(converter.convert(schema));
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to parse schema from registry: " 
+ schema, e);
+    }
   }
 
   /**
@@ -105,22 +110,25 @@ public class SchemaRegistryProvider extends 
SchemaProvider {
    *
    * @param registryUrl
    * @return the Schema in String form.
-   * @throws IOException
    */
-  public String fetchSchemaFromRegistry(String registryUrl) throws IOException 
{
-    HttpURLConnection connection;
-    Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
-    if (matcher.find()) {
-      String creds = matcher.group(1);
-      String urlWithoutCreds = registryUrl.replace(creds + "@", "");
-      connection = getConnection(urlWithoutCreds);
-      setAuthorizationHeader(matcher.group(1), connection);
-    } else {
-      connection = getConnection(registryUrl);
+  public String fetchSchemaFromRegistry(String registryUrl) {
+    try {
+      HttpURLConnection connection;
+      Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
+      if (matcher.find()) {
+        String creds = matcher.group(1);
+        String urlWithoutCreds = registryUrl.replace(creds + "@", "");
+        connection = getConnection(urlWithoutCreds);
+        setAuthorizationHeader(matcher.group(1), connection);
+      } else {
+        connection = getConnection(registryUrl);
+      }
+      ObjectMapper mapper = new ObjectMapper();
+      JsonNode node = mapper.readTree(getStream(connection));
+      return node.get("schema").asText();
+    } catch (Exception e) {
+      throw new HoodieSchemaFetchException("Failed to fetch schema from 
registry", e);
     }
-    ObjectMapper mapper = new ObjectMapper();
-    JsonNode node = mapper.readTree(getStream(connection));
-    return node.get("schema").asText();
   }
 
   private SSLSocketFactory sslSocketFactory;
@@ -182,8 +190,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
     String registryUrl = 
config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
     try {
       return parseSchemaFromRegistry(registryUrl);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error reading source schema from registry 
:" + registryUrl, ioe);
+    } catch (Exception e) {
+      throw new HoodieSchemaFetchException("Error reading source schema from 
registry :" + registryUrl, e);
     }
   }
 
@@ -193,8 +201,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
     String targetRegistryUrl = 
config.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), 
registryUrl);
     try {
       return parseSchemaFromRegistry(targetRegistryUrl);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error reading target schema from registry 
:" + registryUrl, ioe);
+    } catch (Exception e) {
+      throw new HoodieSchemaFetchException("Error reading target schema from 
registry :" + targetRegistryUrl, e);
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index fd195621fd7..467b77c5adb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -20,10 +20,9 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -71,14 +70,14 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
       props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName).getName());
       if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
         if (schemaProvider == null) {
-          throw new HoodieIOException("SchemaProvider has to be set to use 
KafkaAvroSchemaDeserializer");
+          throw new HoodieReadFromSourceException("SchemaProvider has to be 
set to use KafkaAvroSchemaDeserializer");
         }
         props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, 
schemaProvider.getSourceSchema().toString());
       }
     } catch (ClassNotFoundException e) {
       String error = "Could not load custom avro kafka deserializer: " + 
deserializerClassName;
       LOG.error(error);
-      throw new HoodieException(error, e);
+      throw new HoodieReadFromSourceException(error, e);
     }
     this.offsetGen = new KafkaOffsetGen(props);
   }
@@ -88,7 +87,7 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
     JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
     if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
       if (schemaProvider == null) {
-        throw new HoodieException("Please provide a valid schema provider 
class when use ByteArrayDeserializer!");
+        throw new HoodieReadFromSourceException("Please provide a valid schema 
provider class when use ByteArrayDeserializer!");
       }
 
       //Don't want kafka offsets here so we use originalSchemaProvider
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index 93fe9fa4cfe..20538f9f944 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
 import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
@@ -127,8 +128,14 @@ public class GcsEventsSource extends RowSource {
   @Override
   protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
     LOG.info("fetchNextBatch(): Input checkpoint: " + lastCkptStr);
-
-    MessageBatch messageBatch = fetchFileMetadata();
+    MessageBatch messageBatch;
+    try {
+      messageBatch = fetchFileMetadata();
+    } catch (HoodieException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new HoodieReadFromSourceException("Failed to fetch file metadata 
from GCS events source", e);
+    }
 
     if (messageBatch.isEmpty()) {
       LOG.info("No new data. Returning empty batch with checkpoint value: " + 
CHECKPOINT_VALUE_ZERO);
@@ -197,7 +204,7 @@ public class GcsEventsSource extends RowSource {
       pubsubMessagesFetcher.sendAcks(messagesToAck);
       messagesToAck.clear();
     } catch (IOException e) {
-      throw new HoodieException("Error when acknowledging messages from 
Pubsub", e);
+      throw new HoodieReadFromSourceException("Error when acknowledging 
messages from Pubsub", e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index 6083046eef1..529aa0dd655 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -22,9 +22,9 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
 import org.apache.hudi.utilities.config.HiveIncrPullSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.avro.generic.GenericRecord;
@@ -132,8 +132,8 @@ public class HiveIncrPullSource extends AvroSource {
       sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new 
data");
       return new InputBatch<>(Option.of(avroRDD.keys().map(r -> 
((GenericRecord) r.datum()))),
           String.valueOf(commitToPull.get()));
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Unable to read from source from checkpoint: 
" + lastCheckpointStr, ioe);
+    } catch (Exception e) {
+      throw new HoodieReadFromSourceException("Unable to read from source from 
checkpoint: " + lastCheckpointStr, e);
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
index ae0221c8095..92084e0733f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.SqlQueryBuilder;
 import org.apache.hudi.utilities.config.JdbcSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -256,7 +257,7 @@ public class JdbcSource extends RowSource {
       }
     } catch (Exception e) {
       LOG.error("Failed to checkpoint");
-      throw new HoodieException("Failed to checkpoint. Last checkpoint: " + 
lastCkptStr.orElse(null), e);
+      throw new HoodieReadFromSourceException("Failed to checkpoint. Last 
checkpoint: " + lastCkptStr.orElse(null), e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1f6653e9655..7c90f7a0daa 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -21,8 +21,8 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
@@ -59,7 +59,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
     className = 
props.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
     this.offsetGen = new KafkaOffsetGen(props);
     if (this.shouldAddOffsets) {
-      throw new HoodieException("Appending kafka offsets to ProtoKafkaSource 
is not supported");
+      throw new HoodieReadFromSourceException("Appending kafka offsets to 
ProtoKafkaSource is not supported");
     }
   }
 
@@ -83,7 +83,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
       try {
         return (Message) getParseMethod().invoke(getClass(), bytes);
       } catch (IllegalAccessException | InvocationTargetException ex) {
-        throw new HoodieException("Failed to parse proto message from kafka", 
ex);
+        throw new HoodieReadFromSourceException("Failed to parse proto message 
from kafka", ex);
       }
     }
 
@@ -99,7 +99,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
         try {
           parseMethod = getProtoClass().getMethod("parseFrom", byte[].class);
         } catch (NoSuchMethodException ex) {
-          throw new HoodieException("Unable to get proto parsing method from 
specified class: " + className, ex);
+          throw new HoodieReadFromSourceException("Unable to get proto parsing 
method from specified class: " + className, ex);
         }
       }
       return parseMethod;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
index b1686c3fbe4..2a75247dc17 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.util.Lazy;
 import org.apache.hudi.utilities.config.PulsarSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.pulsar.client.api.Consumer;
@@ -179,7 +180,7 @@ public class PulsarSource extends RowSource implements 
Closeable {
       pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
     } catch (PulsarClientException e) {
       LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
-      throw new HoodieIOException("Failed to ack message for topic", e);
+      throw new HoodieReadFromSourceException("Failed to ack message for 
topic", e);
     }
   }
 
@@ -188,7 +189,7 @@ public class PulsarSource extends RowSource implements 
Closeable {
       return pulsarConsumer.get().getLastMessageId();
     } catch (PulsarClientException e) {
       LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
-      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+      throw new HoodieReadFromSourceException("Failed to fetch latest 
messageId for topic", e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 20454bd80ef..efdbbc06a4f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -23,9 +23,9 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -53,7 +53,6 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -92,7 +91,7 @@ public abstract class DebeziumSource extends RowSource {
     } catch (ClassNotFoundException e) {
       String error = "Could not load custom avro kafka deserializer: " + 
deserializerClassName;
       LOG.error(error);
-      throw new HoodieException(error, e);
+      throw new HoodieReadFromSourceException(error, e);
     }
 
     // Currently, debezium source requires Confluent/Kafka schema-registry to 
fetch the latest schema.
@@ -125,9 +124,9 @@ public abstract class DebeziumSource extends RowSource {
         LOG.info(String.format("Spark schema of Kafka Payload for topic 
%s:\n%s", offsetGen.getTopicName(), dataset.schema().treeString()));
         LOG.info(String.format("New checkpoint string: %s", 
CheckpointUtils.offsetsToStr(offsetRanges)));
         return Pair.of(Option.of(dataset), overrideCheckpointStr.isEmpty() ? 
CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
-      } catch (IOException exc) {
-        LOG.error("Fatal error reading and parsing incoming debezium event", 
exc);
-        throw new HoodieException("Fatal error reading and parsing incoming 
debezium event", exc);
+      } catch (Exception e) {
+        LOG.error("Fatal error reading and parsing incoming debezium event", 
e);
+        throw new HoodieReadFromSourceException("Fatal error reading and 
parsing incoming debezium event", e);
       }
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 65aa7346c7d..84a50b24a28 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import com.google.protobuf.Message;
 import com.twitter.bijection.Injection;
@@ -29,6 +30,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.io.Serializable;
+import java.util.Arrays;
 
 import scala.util.Either;
 import scala.util.Left;
@@ -108,9 +110,17 @@ public class AvroConvertor implements Serializable {
   }
 
   public GenericRecord fromJson(String json) {
-    initSchema();
-    initJsonConvertor();
-    return jsonConverter.convert(json, schema);
+    try {
+      initSchema();
+      initJsonConvertor();
+      return jsonConverter.convert(json, schema);
+    } catch (Exception e) {
+      if (json != null) {
+        throw new HoodieSchemaException("Failed to convert schema from json to 
avro: " + json, e);
+      } else {
+        throw new HoodieSchemaException("Failed to convert schema from json to 
avro. Schema string was null.", e);
+      }
+    }
   }
 
   public Either<GenericRecord,String> fromJsonWithError(String json) {
@@ -124,18 +134,35 @@ public class AvroConvertor implements Serializable {
   }
 
   public Schema getSchema() {
-    return new Schema.Parser().parse(schemaStr);
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to parse json schema: " + 
schemaStr, e);
+    }
   }
 
   public GenericRecord fromAvroBinary(byte[] avroBinary) {
-    initSchema();
-    initInjection();
-    return recordInjection.invert(avroBinary).get();
+    try {
+      initSchema();
+      initInjection();
+      return recordInjection.invert(avroBinary).get();
+    } catch (Exception e) {
+      if (avroBinary != null) {
+        throw new HoodieSchemaException("Failed to get avro schema from avro 
binary: " + Arrays.toString(avroBinary), e);
+      } else {
+        throw new HoodieSchemaException("Failed to get avro schema from avro 
binary. Binary is null", e);
+      }
+    }
+
   }
 
   public GenericRecord fromProtoMessage(Message message) {
-    initSchema();
-    return ProtoConversionUtil.convertToAvro(schema, message);
+    try {
+      initSchema();
+      return ProtoConversionUtil.convertToAvro(schema, message);
+    } catch (Exception e) {
+      throw new HoodieSchemaException("Failed to get avro schema from proto 
message", e);
+    }
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index cbc66ff4be7..8945802355d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -72,7 +72,7 @@ public class ChainedTransformer implements Transformer {
       } else {
         String[] splits = 
configuredTransformer.split(ID_TRANSFORMER_CLASS_NAME_DELIMITER);
         if (splits.length > 2) {
-          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+          throw new HoodieTransformPlanException("There should only be one 
colon in a configured transformer");
         }
         String id = splits[0];
         validateIdentifier(id, identifiers, configuredTransformer);
@@ -80,10 +80,10 @@ public class ChainedTransformer implements Transformer {
         transformers.add(new TransformerInfo(transformer, id));
       }
     }
-
-    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
-            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
-        "Either all transformers should have identifier or none should");
+    if (!(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+        || transformers.stream().noneMatch(TransformerInfo::hasIdentifier))) {
+      throw new HoodieTransformPlanException("Either all transformers should 
have identifier or none should");
+    }
   }
 
   public List<String> getTransformersNames() {
@@ -101,9 +101,11 @@ public class ChainedTransformer implements Transformer {
   }
 
   private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
-    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (StringUtils.isNullOrEmpty(id)) {
+      throw new HoodieTransformPlanException(String.format("Transformer 
identifier is empty for %s", configuredTransformer));
+    }
     if (identifiers.contains(id)) {
-      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+      throw new HoodieTransformPlanException(String.format("Duplicate 
identifier %s found for transformer %s", id, configuredTransformer));
     } else {
       identifiers.add(id);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index af5d3a44b06..1256491528d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -45,14 +46,17 @@ public class FlatteningTransformer implements Transformer {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
       TypedProperties properties) {
-
-    // tmp table name doesn't like dashes
-    String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-    LOG.info("Registering tmp table : " + tmpTable);
-    rowDataset.createOrReplaceTempView(tmpTable);
-    Dataset<Row> transformed = sparkSession.sql("select " + 
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
-    sparkSession.catalog().dropTempView(tmpTable);
-    return transformed;
+    try {
+      // tmp table name doesn't like dashes
+      String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+      LOG.info("Registering tmp table : " + tmpTable);
+      rowDataset.createOrReplaceTempView(tmpTable);
+      Dataset<Row> transformed = sparkSession.sql("select " + 
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
+      sparkSession.catalog().dropTempView(tmpTable);
+      return transformed;
+    }  catch (Exception e) {
+      throw new HoodieTransformExecutionException("Failed to apply flattening 
transformer", e);
+    }
   }
 
   public String flattenSchema(StructType schema, String prefix) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
index 2a58c498b46..98f1ce482b8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -20,8 +20,9 @@ package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.config.SqlTransformerConfig;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -70,7 +71,7 @@ public class SqlFileBasedTransformer implements Transformer {
 
     final String sqlFile = 
props.getString(SqlTransformerConfig.TRANSFORMER_SQL_FILE.key());
     if (null == sqlFile) {
-      throw new IllegalArgumentException(
+      throw new HoodieTransformException(
           "Missing required configuration : (" + 
SqlTransformerConfig.TRANSFORMER_SQL_FILE.key() + ")");
     }
 
@@ -96,7 +97,7 @@ public class SqlFileBasedTransformer implements Transformer {
       }
       return rows;
     } catch (final IOException ioe) {
-      throw new HoodieIOException("Error reading transformer SQL file.", ioe);
+      throw new HoodieTransformExecutionException("Error reading transformer 
SQL file.", ioe);
     } finally {
       sparkSession.catalog().dropTempView(tmpTable);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index aaabe3dce5a..c9b410121c7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -20,6 +20,8 @@ package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.config.SqlTransformerConfig;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
+import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -47,17 +49,21 @@ public class SqlQueryBasedTransformer implements 
Transformer {
       TypedProperties properties) {
     String transformerSQL = 
properties.getString(SqlTransformerConfig.TRANSFORMER_SQL.key());
     if (null == transformerSQL) {
-      throw new IllegalArgumentException("Missing configuration : (" + 
SqlTransformerConfig.TRANSFORMER_SQL.key() + ")");
+      throw new HoodieTransformException("Missing configuration : (" + 
SqlTransformerConfig.TRANSFORMER_SQL.key() + ")");
     }
 
-    // tmp table name doesn't like dashes
-    String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-    LOG.info("Registering tmp table : " + tmpTable);
-    rowDataset.createOrReplaceTempView(tmpTable);
-    String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
-    LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
-    Dataset<Row> transformed = sparkSession.sql(sqlStr);
-    sparkSession.catalog().dropTempView(tmpTable);
-    return transformed;
+    try {
+      // tmp table name doesn't like dashes
+      String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+      LOG.info("Registering tmp table : " + tmpTable);
+      rowDataset.createOrReplaceTempView(tmpTable);
+      String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
+      LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
+      Dataset<Row> transformed = sparkSession.sql(sqlStr);
+      sparkSession.catalog().dropTempView(tmpTable);
+      return transformed;
+    } catch (Exception e) {
+      throw new HoodieTransformExecutionException("Failed to apply sql query 
based transformer", e);
+    }
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f6b70356ec2..77da8bfdb12 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -57,6 +57,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
@@ -2415,7 +2416,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with Path not exist error
-    assertThrows(org.apache.spark.sql.AnalysisException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+    assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
     TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
 
     if (downstreamCfg.configs == null) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
index 6c3caf72693..4a47cd3c64b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer.multisync;
 
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetaSyncException;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -61,7 +61,7 @@ public class TestMultipleMetaSync extends 
HoodieDeltaStreamerTestBase {
     MockSyncTool1.syncSuccess = false;
     MockSyncTool2.syncSuccess = false;
     HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames);
-    Exception e = assertThrows(HoodieException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+    Exception e = assertThrows(HoodieMetaSyncException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
     
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
     assertTrue(MockSyncTool1.syncSuccess);
     assertTrue(MockSyncTool2.syncSuccess);
@@ -73,7 +73,7 @@ public class TestMultipleMetaSync extends 
HoodieDeltaStreamerTestBase {
     MockSyncTool1.syncSuccess = false;
     MockSyncTool2.syncSuccess = false;
     HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, 
getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1", 
"MockSyncToolException2"));
-    Exception e = assertThrows(HoodieException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+    Exception e = assertThrows(HoodieMetaSyncException.class, () -> new 
HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
     
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
     
assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName()));
     assertTrue(MockSyncTool1.syncSuccess);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
index 26768324946..80a2f8bc6e4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.utilities.functional;
 
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
@@ -87,7 +88,7 @@ public class TestChainedTransformer extends 
SparkClientFunctionalTestHarness {
       ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")));
       fail();
     } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+      assertTrue(e instanceof HoodieTransformPlanException, e.getMessage());
     }
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
index d86ce15113f..e9f0ee7ebea 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.functional;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
 import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 import org.apache.spark.sql.Column;
@@ -132,7 +133,7 @@ public class TestErrorTableAwareChainedTransformer extends 
SparkClientFunctional
       ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
       fail();
     } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+      assertTrue(e instanceof HoodieTransformException, e.getMessage());
     }
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index fadaa3fd217..13816351849 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -44,7 +44,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.io.IOException;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -170,7 +169,7 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
     }
 
     @Override
-    public String fetchSchemaFromRegistry(String registryUrl) throws 
IOException {
+    public String fetchSchemaFromRegistry(String registryUrl) {
       return schema;
     }
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 1c9152755ca..7f9fe7e1e0e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -20,7 +20,7 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.exception.HoodieTransformException;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.spark.sql.Dataset;
@@ -101,7 +101,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
         "hoodie.deltastreamer.transformer.sql.file",
         UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
     assertThrows(
-        HoodieIOException.class,
+        HoodieTransformException.class,
         () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, 
props));
   }
 

Reply via email to