This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit a3a87b49d589061c280cfc982a85ec1f85dd0138 Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Dec 11 16:00:26 2018 +0100 Improve exception flow --- .../spark/structuredstreaming/translation/io/DatasetSource.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java index 75cdd5d..d23ecf3 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; @@ -137,6 +136,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{ try { reader = source.createReader(options); } catch (IOException e) { + throw new RuntimeException( + "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e); } return new DatasetMicroBatchPartitionReader(reader); } @@ -145,9 +146,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{ return result; } catch (Exception e) { - e.printStackTrace(); + throw new RuntimeException( + "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e); } - return result; } }
