This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 38d5c688 fix spark example can't run error. (#1921)
38d5c688 is described below

commit 38d5c688137989f93e3eb7c0b1f93fc5747c4ee7
Author: TrickyZerg <[email protected]>
AuthorDate: Thu May 19 17:40:47 2022 +0800

    fix spark example can't run error. (#1921)
---
 .../translation/spark/serialization/InternalRowSerialization.java  | 5 ++++-
 .../translation/spark/source/batch/BatchPartitionReader.java       | 7 +++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index 8b2ba13d..fe6f0a62 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.translation.serialization.RowSerialization;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 
@@ -39,7 +40,7 @@ public final class InternalRowSerialization implements 
RowSerialization<Internal
         SpecificInternalRow sparkRow = new SpecificInternalRow(sparkSchema);
         Object[] fields = seaTunnelRow.getFields();
         for (int i = 0; i < fields.length; i++) {
-            setField(fields[i], i,  sparkRow);
+            setField(fields[i], i, sparkRow);
         }
         return sparkRow;
     }
@@ -70,6 +71,8 @@ public final class InternalRowSerialization implements 
RowSerialization<Internal
             sparkRow.setDouble(index, (double) field);
         } else if (field instanceof Float) {
             sparkRow.setFloat(index, (float) field);
+        } else if (field instanceof String) {
+            sparkRow.update(index, UTF8String.fromString(field.toString()));
         } else {
             throw new RuntimeException(String.format("Unsupported data type: 
%s", field.getClass()));
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 51fecc17..4ed7cfb1 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -30,12 +30,17 @@ import 
org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 public class BatchPartitionReader implements InputPartitionReader<InternalRow> 
{
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BatchPartitionReader.class);
+
     protected static final Integer INTERVAL = 100;
 
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -91,6 +96,8 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
                 parallelSource.run(collector);
             } catch (Exception e) {
                 handover.reportError(e);
+                LOGGER.error("ParallelSource execute failed.", e);
+                running = false;
             }
         });
         prepare = false;

Reply via email to