This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 38d5c688 fix spark example can't run error. (#1921)
38d5c688 is described below
commit 38d5c688137989f93e3eb7c0b1f93fc5747c4ee7
Author: TrickyZerg <[email protected]>
AuthorDate: Thu May 19 17:40:47 2022 +0800
fix spark example can't run error. (#1921)
---
.../translation/spark/serialization/InternalRowSerialization.java | 5 ++++-
.../translation/spark/source/batch/BatchPartitionReader.java | 7 +++++++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index 8b2ba13d..fe6f0a62 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.translation.serialization.RowSerialization;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
@@ -39,7 +40,7 @@ public final class InternalRowSerialization implements
RowSerialization<Internal
SpecificInternalRow sparkRow = new SpecificInternalRow(sparkSchema);
Object[] fields = seaTunnelRow.getFields();
for (int i = 0; i < fields.length; i++) {
- setField(fields[i], i, sparkRow);
+ setField(fields[i], i, sparkRow);
}
return sparkRow;
}
@@ -70,6 +71,8 @@ public final class InternalRowSerialization implements
RowSerialization<Internal
sparkRow.setDouble(index, (double) field);
} else if (field instanceof Float) {
sparkRow.setFloat(index, (float) field);
+ } else if (field instanceof String) {
+ sparkRow.update(index, UTF8String.fromString(field.toString()));
} else {
throw new RuntimeException(String.format("Unsupported data type:
%s", field.getClass()));
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 51fecc17..4ed7cfb1 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -30,12 +30,17 @@ import
org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
public class BatchPartitionReader implements InputPartitionReader<InternalRow>
{
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BatchPartitionReader.class);
+
protected static final Integer INTERVAL = 100;
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -91,6 +96,8 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
parallelSource.run(collector);
} catch (Exception e) {
handover.reportError(e);
+ LOGGER.error("ParallelSource execute failed.", e);
+ running = false;
}
});
prepare = false;