This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 5ecdebf8 [Api-Draft] Spark InternalRow Serialization (#1918)
5ecdebf8 is described below

commit 5ecdebf860a56ec0ddc085fe2b6d9c065e709642
Author: Zongwen Li <[email protected]>
AuthorDate: Thu May 19 13:21:30 2022 +0800

    [Api-Draft] Spark InternalRow Serialization (#1918)
---
 .../serialization/InternalRowSerialization.java    | 46 +++++++++++++++++--
 .../spark/serialization/SparkRowSerialization.java | 53 ----------------------
 .../translation/spark/sink/SparkDataWriter.java    | 10 ++--
 .../spark/source/InternalRowCollector.java         |  6 ++-
 .../source/batch/BatchParallelSourceReader.java    |  2 +-
 .../spark/source/batch/BatchPartition.java         |  7 ++-
 .../spark/source/batch/BatchPartitionReader.java   |  7 ++-
 .../continnous/ContinuousParallelSourceReader.java |  2 +-
 .../source/continnous/ContinuousPartition.java     |  6 ++-
 .../continnous/ContinuousPartitionReader.java      |  5 +-
 .../micro/MicroBatchParallelSourceReader.java      |  2 +-
 .../spark/source/micro/MicroBatchPartition.java    |  6 ++-
 .../source/micro/MicroBatchPartitionReader.java    |  8 ++--
 13 files changed, 82 insertions(+), 78 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index 0a0a4378..8b2ba13d 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -21,19 +21,57 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.serialization.RowSerialization;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 
-// TODO:
-public class InternalRowSerialization implements RowSerialization<InternalRow> 
{
+public final class InternalRowSerialization implements 
RowSerialization<InternalRow> {
+
+    private final StructType sparkSchema;
+
+    public InternalRowSerialization(StructType sparkSchema) {
+        this.sparkSchema = sparkSchema;
+    }
 
     @Override
     public InternalRow serialize(SeaTunnelRow seaTunnelRow) throws IOException 
{
-        return null;
+        SpecificInternalRow sparkRow = new SpecificInternalRow(sparkSchema);
+        Object[] fields = seaTunnelRow.getFields();
+        for (int i = 0; i < fields.length; i++) {
+            setField(fields[i], i,  sparkRow);
+        }
+        return sparkRow;
     }
 
     @Override
     public SeaTunnelRow deserialize(InternalRow engineRow) throws IOException {
-        return null;
+        Object[] fields = new Object[engineRow.numFields()];
+        for (int i = 0; i < engineRow.numFields(); i++) {
+            fields[i] = engineRow.get(i, sparkSchema.apply(i).dataType());
+        }
+        return new SeaTunnelRow(fields);
+    }
+
+    private void setField(Object field, int index, InternalRow sparkRow) {
+        if (field == null) {
+            sparkRow.setNullAt(index);
+        } else if (field instanceof Byte) {
+            sparkRow.setByte(index, (byte) field);
+        } else if (field instanceof Short) {
+            sparkRow.setShort(index, (short) field);
+        } else if (field instanceof Integer) {
+            sparkRow.setInt(index, (int) field);
+        } else if (field instanceof Long) {
+            sparkRow.setLong(index, (long) field);
+        } else if (field instanceof Boolean) {
+            sparkRow.setBoolean(index, (boolean) field);
+        } else if (field instanceof Double) {
+            sparkRow.setDouble(index, (double) field);
+        } else if (field instanceof Float) {
+            sparkRow.setFloat(index, (float) field);
+        } else {
+            throw new RuntimeException(String.format("Unsupported data type: 
%s", field.getClass()));
+        }
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
deleted file mode 100644
index 5d9aa995..00000000
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.serialization;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.serialization.RowSerialization;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-public class SparkRowSerialization implements RowSerialization<Row> {
-
-    @Override
-    public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
-        return RowFactory.create(seaTunnelRow.getFields());
-    }
-
-    @Override
-    public SeaTunnelRow deserialize(Row engineRow) throws IOException {
-        Object[] fields = new Object[engineRow.length()];
-        for (int i = 0; i < engineRow.length(); i++) {
-            fields[i] = engineRow.get(i);
-        }
-        return new SeaTunnelRow(fields);
-    }
-
-    public SeaTunnelRow deserialize(StructType schema, InternalRow engineRow) 
throws IOException {
-        Object[] fields = new Object[engineRow.numFields()];
-        for (int i = 0; i < engineRow.numFields(); i++) {
-            fields[i] = engineRow.get(i, schema.apply(i).dataType());
-        }
-        return new SeaTunnelRow(fields);
-    }
-}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index e17d99e4..c79367a5 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -20,7 +20,8 @@ package org.apache.seatunnel.translation.spark.sink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.translation.spark.serialization.SparkRowSerialization;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
+import 
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
@@ -38,20 +39,19 @@ public class SparkDataWriter<CommitInfoT, StateT> 
implements DataWriter<Internal
 
     @Nullable
     private final SinkCommitter<CommitInfoT> sinkCommitter;
-    private final StructType schema;
-    private final SparkRowSerialization rowSerialization = new 
SparkRowSerialization();
+    private final RowSerialization<InternalRow> rowSerialization;
 
     SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
                     SinkCommitter<CommitInfoT> sinkCommitter,
                     StructType schema) {
         this.sinkWriter = sinkWriter;
         this.sinkCommitter = sinkCommitter;
-        this.schema = schema;
+        this.rowSerialization = new InternalRowSerialization(schema);
     }
 
     @Override
     public void write(InternalRow record) throws IOException {
-        sinkWriter.write(rowSerialization.deserialize(schema, record));
+        sinkWriter.write(rowSerialization.deserialize(record));
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
index 1b31014d..b5678eb8 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
@@ -23,15 +23,17 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
 
 public class InternalRowCollector implements Collector<SeaTunnelRow> {
     private final Handover<InternalRow> handover;
     private final CheckpointLock checkpointLock;
-    private final InternalRowSerialization rowSerialization = new 
InternalRowSerialization();
+    private final InternalRowSerialization rowSerialization;
 
-    public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock 
checkpointLock) {
+    public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock 
checkpointLock, StructType sparkSchema) {
         this.handover = handover;
         this.checkpointLock = checkpointLock;
+        this.rowSerialization = new InternalRowSerialization(sparkSchema);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
index ae1d6c60..0498bf28 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
@@ -49,7 +49,7 @@ public class BatchParallelSourceReader implements 
DataSourceReader {
     public List<InputPartition<InternalRow>> planInputPartitions() {
         List<InputPartition<InternalRow>> virtualPartitions = new 
ArrayList<>(parallelism);
         for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
-            virtualPartitions.add(new BatchPartition(source, parallelism, 
subtaskId));
+            virtualPartitions.add(new BatchPartition(source, parallelism, 
subtaskId, rowType));
         }
         return virtualPartitions;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
index 0fd42477..321f8ae8 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
@@ -23,20 +23,23 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
 
 public class BatchPartition implements InputPartition<InternalRow> {
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
     protected final Integer subtaskId;
+    protected final StructType rowType;
 
-    public BatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer 
parallelism, Integer subtaskId) {
+    public BatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer 
parallelism, Integer subtaskId, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
+        this.rowType = rowType;
     }
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new BatchPartitionReader(source, parallelism, subtaskId);
+        return new BatchPartitionReader(source, parallelism, subtaskId, 
rowType);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 45999f62..51fecc17 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.List;
@@ -40,6 +41,7 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
     protected final Integer subtaskId;
+    protected final StructType rowType;
 
     protected final ExecutorService executorService;
     protected final Handover<InternalRow> handover;
@@ -50,10 +52,11 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
     protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
     protected volatile Collector<SeaTunnelRow> collector;
 
-    public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, Integer subtaskId) {
+    public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Integer parallelism, Integer subtaskId, StructType rowType) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
+        this.rowType = rowType;
         this.executorService = 
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, 
String.format("parallel-split-enumerator-executor-%s", subtaskId));
         this.handover = new Handover<>();
     }
@@ -94,7 +97,7 @@ public class BatchPartitionReader implements 
InputPartitionReader<InternalRow> {
     }
 
     protected Collector<SeaTunnelRow> createCollector() {
-        return new InternalRowCollector(handover, new EmptyLock());
+        return new InternalRowCollector(handover, new EmptyLock(), rowType);
     }
 
     protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
index 0809a2fd..1c1f05ac 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
@@ -109,7 +109,7 @@ public class ContinuousParallelSourceReader implements 
ContinuousReader {
         List<InputPartition<InternalRow>> virtualPartitions = new 
ArrayList<>(parallelism);
         for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
             ReaderState readerState = readerStateMap.get(subtaskId);
-            virtualPartitions.add(new ContinuousPartition(source, parallelism, 
subtaskId, checkpointId, readerState == null ? null : readerState.getBytes()));
+            virtualPartitions.add(new ContinuousPartition(source, parallelism, 
subtaskId, rowType, checkpointId, readerState == null ? null : 
readerState.getBytes()));
         }
         return virtualPartitions;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
index 49200301..52f0a9d9 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
 
 import java.util.List;
 
@@ -30,23 +31,26 @@ public class ContinuousPartition implements 
InputPartition<InternalRow> {
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
     protected final Integer subtaskId;
+    protected final StructType rowType;
     protected final Integer checkpointId;
     protected final List<byte[]> restoredState;
 
     public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
                                Integer parallelism,
                                Integer subtaskId,
+                               StructType rowType,
                                Integer checkpointId,
                                List<byte[]> restoredState) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
+        this.rowType = rowType;
         this.checkpointId = checkpointId;
         this.restoredState = restoredState;
     }
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new ContinuousPartitionReader(source, parallelism, subtaskId, 
checkpointId, restoredState);
+        return new ContinuousPartitionReader(source, parallelism, subtaskId, 
rowType, checkpointId, restoredState);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
index 463f6e99..180d3ef2 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
 import org.apache.spark.sql.catalyst.InternalRow;
 import 
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.List;
@@ -34,8 +35,8 @@ public class ContinuousPartitionReader extends 
BatchPartitionReader implements C
     protected volatile Integer checkpointId;
     protected final List<byte[]> restoredState;
 
-    public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, Integer checkpointId, 
List<byte[]> restoredState) {
-        super(source, parallelism, subtaskId);
+    public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer 
checkpointId, List<byte[]> restoredState) {
+        super(source, parallelism, subtaskId, rowType);
         this.checkpointId = checkpointId;
         this.restoredState = restoredState;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
index 23b29c7f..a387cb54 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
@@ -92,7 +92,7 @@ public class MicroBatchParallelSourceReader implements 
MicroBatchReader {
         for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
             // TODO: get state
             List<byte[]> subtaskState = null;
-            virtualPartitions.add(new MicroBatchPartition(source, parallelism, 
subtaskId, checkpointId, checkpointInterval, subtaskState));
+            virtualPartitions.add(new MicroBatchPartition(source, parallelism, 
subtaskId, rowType, checkpointId, checkpointInterval, subtaskState));
         }
         checkpointId++;
         return virtualPartitions;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
index 3e3dde62..4b018cd9 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
 
 import java.util.List;
 
@@ -30,6 +31,7 @@ public class MicroBatchPartition implements 
InputPartition<InternalRow> {
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected final Integer parallelism;
     protected final Integer subtaskId;
+    protected final StructType rowType;
     protected final Integer checkpointId;
     protected final Integer checkpointInterval;
     protected final List<byte[]> restoredState;
@@ -37,12 +39,14 @@ public class MicroBatchPartition implements 
InputPartition<InternalRow> {
     public MicroBatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
                                Integer parallelism,
                                Integer subtaskId,
+                               StructType rowType,
                                Integer checkpointId,
                                Integer checkpointInterval,
                                List<byte[]> restoredState) {
         this.source = source;
         this.parallelism = parallelism;
         this.subtaskId = subtaskId;
+        this.rowType = rowType;
         this.checkpointId = checkpointId;
         this.checkpointInterval = checkpointInterval;
         this.restoredState = restoredState;
@@ -50,6 +54,6 @@ public class MicroBatchPartition implements 
InputPartition<InternalRow> {
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new MicroBatchPartitionReader(source, parallelism, subtaskId, 
checkpointId, checkpointInterval, restoredState);
+        return new MicroBatchPartitionReader(source, parallelism, subtaskId, 
rowType, checkpointId, checkpointInterval, restoredState);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
index 46f40404..617282cc 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
@@ -27,6 +27,8 @@ import 
org.apache.seatunnel.translation.spark.source.ReaderState;
 import 
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
+import org.apache.spark.sql.types.StructType;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -41,8 +43,8 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
     protected final Integer checkpointInterval;
     protected ScheduledThreadPoolExecutor executor;
 
-    public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, Integer checkpointId, Integer 
checkpointInterval, List<byte[]> restoredState) {
-        super(source, parallelism, subtaskId);
+    public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer 
checkpointId, Integer checkpointInterval, List<byte[]> restoredState) {
+        super(source, parallelism, subtaskId, rowType);
         this.checkpointId = checkpointId;
         this.restoredState = restoredState;
         this.checkpointInterval = checkpointInterval;
@@ -58,7 +60,7 @@ public class MicroBatchPartitionReader extends 
BatchPartitionReader {
 
     @Override
     protected Collector<SeaTunnelRow> createCollector() {
-        return new InternalRowCollector(handover, checkpointLock);
+        return new InternalRowCollector(handover, checkpointLock, rowType);
     }
 
     @Override

Reply via email to