This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 5ecdebf8 [Api-Draft] Spark InternalRow Serialization (#1918)
5ecdebf8 is described below
commit 5ecdebf860a56ec0ddc085fe2b6d9c065e709642
Author: Zongwen Li <[email protected]>
AuthorDate: Thu May 19 13:21:30 2022 +0800
[Api-Draft] Spark InternalRow Serialization (#1918)
---
.../serialization/InternalRowSerialization.java | 46 +++++++++++++++++--
.../spark/serialization/SparkRowSerialization.java | 53 ----------------------
.../translation/spark/sink/SparkDataWriter.java | 10 ++--
.../spark/source/InternalRowCollector.java | 6 ++-
.../source/batch/BatchParallelSourceReader.java | 2 +-
.../spark/source/batch/BatchPartition.java | 7 ++-
.../spark/source/batch/BatchPartitionReader.java | 7 ++-
.../continnous/ContinuousParallelSourceReader.java | 2 +-
.../source/continnous/ContinuousPartition.java | 6 ++-
.../continnous/ContinuousPartitionReader.java | 5 +-
.../micro/MicroBatchParallelSourceReader.java | 2 +-
.../spark/source/micro/MicroBatchPartition.java | 6 ++-
.../source/micro/MicroBatchPartitionReader.java | 8 ++--
13 files changed, 82 insertions(+), 78 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
index 0a0a4378..8b2ba13d 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowSerialization.java
@@ -21,19 +21,57 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.serialization.RowSerialization;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
import java.io.IOException;
-// TODO:
-public class InternalRowSerialization implements RowSerialization<InternalRow>
{
+public final class InternalRowSerialization implements
RowSerialization<InternalRow> {
+
+ private final StructType sparkSchema;
+
+ public InternalRowSerialization(StructType sparkSchema) {
+ this.sparkSchema = sparkSchema;
+ }
@Override
public InternalRow serialize(SeaTunnelRow seaTunnelRow) throws IOException
{
- return null;
+ SpecificInternalRow sparkRow = new SpecificInternalRow(sparkSchema);
+ Object[] fields = seaTunnelRow.getFields();
+ for (int i = 0; i < fields.length; i++) {
+ setField(fields[i], i, sparkRow);
+ }
+ return sparkRow;
}
@Override
public SeaTunnelRow deserialize(InternalRow engineRow) throws IOException {
- return null;
+ Object[] fields = new Object[engineRow.numFields()];
+ for (int i = 0; i < engineRow.numFields(); i++) {
+ fields[i] = engineRow.get(i, sparkSchema.apply(i).dataType());
+ }
+ return new SeaTunnelRow(fields);
+ }
+
+ private void setField(Object field, int index, InternalRow sparkRow) {
+ if (field == null) {
+ sparkRow.setNullAt(index);
+ } else if (field instanceof Byte) {
+ sparkRow.setByte(index, (byte) field);
+ } else if (field instanceof Short) {
+ sparkRow.setShort(index, (short) field);
+ } else if (field instanceof Integer) {
+ sparkRow.setInt(index, (int) field);
+ } else if (field instanceof Long) {
+ sparkRow.setLong(index, (long) field);
+ } else if (field instanceof Boolean) {
+ sparkRow.setBoolean(index, (boolean) field);
+ } else if (field instanceof Double) {
+ sparkRow.setDouble(index, (double) field);
+ } else if (field instanceof Float) {
+ sparkRow.setFloat(index, (float) field);
+ } else {
+ throw new RuntimeException(String.format("Unsupported data type:
%s", field.getClass()));
+ }
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
deleted file mode 100644
index 5d9aa995..00000000
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.spark.serialization;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.serialization.RowSerialization;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-public class SparkRowSerialization implements RowSerialization<Row> {
-
- @Override
- public Row serialize(SeaTunnelRow seaTunnelRow) throws IOException {
- return RowFactory.create(seaTunnelRow.getFields());
- }
-
- @Override
- public SeaTunnelRow deserialize(Row engineRow) throws IOException {
- Object[] fields = new Object[engineRow.length()];
- for (int i = 0; i < engineRow.length(); i++) {
- fields[i] = engineRow.get(i);
- }
- return new SeaTunnelRow(fields);
- }
-
- public SeaTunnelRow deserialize(StructType schema, InternalRow engineRow)
throws IOException {
- Object[] fields = new Object[engineRow.numFields()];
- for (int i = 0; i < engineRow.numFields(); i++) {
- fields[i] = engineRow.get(i, schema.apply(i).dataType());
- }
- return new SeaTunnelRow(fields);
- }
-}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index e17d99e4..c79367a5 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -20,7 +20,8 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.translation.spark.serialization.SparkRowSerialization;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
+import
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
@@ -38,20 +39,19 @@ public class SparkDataWriter<CommitInfoT, StateT>
implements DataWriter<Internal
@Nullable
private final SinkCommitter<CommitInfoT> sinkCommitter;
- private final StructType schema;
- private final SparkRowSerialization rowSerialization = new
SparkRowSerialization();
+ private final RowSerialization<InternalRow> rowSerialization;
SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
SinkCommitter<CommitInfoT> sinkCommitter,
StructType schema) {
this.sinkWriter = sinkWriter;
this.sinkCommitter = sinkCommitter;
- this.schema = schema;
+ this.rowSerialization = new InternalRowSerialization(schema);
}
@Override
public void write(InternalRow record) throws IOException {
- sinkWriter.write(rowSerialization.deserialize(schema, record));
+ sinkWriter.write(rowSerialization.deserialize(record));
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
index 1b31014d..b5678eb8 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
@@ -23,15 +23,17 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
public class InternalRowCollector implements Collector<SeaTunnelRow> {
private final Handover<InternalRow> handover;
private final CheckpointLock checkpointLock;
- private final InternalRowSerialization rowSerialization = new
InternalRowSerialization();
+ private final InternalRowSerialization rowSerialization;
- public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock
checkpointLock) {
+ public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock
checkpointLock, StructType sparkSchema) {
this.handover = handover;
this.checkpointLock = checkpointLock;
+ this.rowSerialization = new InternalRowSerialization(sparkSchema);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
index ae1d6c60..0498bf28 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
@@ -49,7 +49,7 @@ public class BatchParallelSourceReader implements
DataSourceReader {
public List<InputPartition<InternalRow>> planInputPartitions() {
List<InputPartition<InternalRow>> virtualPartitions = new
ArrayList<>(parallelism);
for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
- virtualPartitions.add(new BatchPartition(source, parallelism,
subtaskId));
+ virtualPartitions.add(new BatchPartition(source, parallelism,
subtaskId, rowType));
}
return virtualPartitions;
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
index 0fd42477..321f8ae8 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
@@ -23,20 +23,23 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
public class BatchPartition implements InputPartition<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
protected final Integer subtaskId;
+ protected final StructType rowType;
- public BatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer
parallelism, Integer subtaskId) {
+ public BatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source, Integer
parallelism, Integer subtaskId, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
+ this.rowType = rowType;
}
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new BatchPartitionReader(source, parallelism, subtaskId);
+ return new BatchPartitionReader(source, parallelism, subtaskId,
rowType);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
index 45999f62..51fecc17 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.List;
@@ -40,6 +41,7 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
protected final Integer subtaskId;
+ protected final StructType rowType;
protected final ExecutorService executorService;
protected final Handover<InternalRow> handover;
@@ -50,10 +52,11 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
protected volatile Collector<SeaTunnelRow> collector;
- public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, Integer subtaskId) {
+ public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, Integer subtaskId, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
+ this.rowType = rowType;
this.executorService =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1,
String.format("parallel-split-enumerator-executor-%s", subtaskId));
this.handover = new Handover<>();
}
@@ -94,7 +97,7 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
}
protected Collector<SeaTunnelRow> createCollector() {
- return new InternalRowCollector(handover, new EmptyLock());
+ return new InternalRowCollector(handover, new EmptyLock(), rowType);
}
protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
index 0809a2fd..1c1f05ac 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
@@ -109,7 +109,7 @@ public class ContinuousParallelSourceReader implements
ContinuousReader {
List<InputPartition<InternalRow>> virtualPartitions = new
ArrayList<>(parallelism);
for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
ReaderState readerState = readerStateMap.get(subtaskId);
- virtualPartitions.add(new ContinuousPartition(source, parallelism,
subtaskId, checkpointId, readerState == null ? null : readerState.getBytes()));
+ virtualPartitions.add(new ContinuousPartition(source, parallelism,
subtaskId, rowType, checkpointId, readerState == null ? null :
readerState.getBytes()));
}
return virtualPartitions;
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
index 49200301..52f0a9d9 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
import java.util.List;
@@ -30,23 +31,26 @@ public class ContinuousPartition implements
InputPartition<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
protected final Integer subtaskId;
+ protected final StructType rowType;
protected final Integer checkpointId;
protected final List<byte[]> restoredState;
public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism,
Integer subtaskId,
+ StructType rowType,
Integer checkpointId,
List<byte[]> restoredState) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
+ this.rowType = rowType;
this.checkpointId = checkpointId;
this.restoredState = restoredState;
}
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new ContinuousPartitionReader(source, parallelism, subtaskId,
checkpointId, restoredState);
+ return new ContinuousPartitionReader(source, parallelism, subtaskId,
rowType, checkpointId, restoredState);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
index 463f6e99..180d3ef2 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
import org.apache.spark.sql.catalyst.InternalRow;
import
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.List;
@@ -34,8 +35,8 @@ public class ContinuousPartitionReader extends
BatchPartitionReader implements C
protected volatile Integer checkpointId;
protected final List<byte[]> restoredState;
- public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, Integer checkpointId,
List<byte[]> restoredState) {
- super(source, parallelism, subtaskId);
+ public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer
checkpointId, List<byte[]> restoredState) {
+ super(source, parallelism, subtaskId, rowType);
this.checkpointId = checkpointId;
this.restoredState = restoredState;
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
index 23b29c7f..a387cb54 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
@@ -92,7 +92,7 @@ public class MicroBatchParallelSourceReader implements
MicroBatchReader {
for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
// TODO: get state
List<byte[]> subtaskState = null;
- virtualPartitions.add(new MicroBatchPartition(source, parallelism,
subtaskId, checkpointId, checkpointInterval, subtaskState));
+ virtualPartitions.add(new MicroBatchPartition(source, parallelism,
subtaskId, rowType, checkpointId, checkpointInterval, subtaskState));
}
checkpointId++;
return virtualPartitions;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
index 3e3dde62..4b018cd9 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
import java.util.List;
@@ -30,6 +31,7 @@ public class MicroBatchPartition implements
InputPartition<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
protected final Integer subtaskId;
+ protected final StructType rowType;
protected final Integer checkpointId;
protected final Integer checkpointInterval;
protected final List<byte[]> restoredState;
@@ -37,12 +39,14 @@ public class MicroBatchPartition implements
InputPartition<InternalRow> {
public MicroBatchPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism,
Integer subtaskId,
+ StructType rowType,
Integer checkpointId,
Integer checkpointInterval,
List<byte[]> restoredState) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
+ this.rowType = rowType;
this.checkpointId = checkpointId;
this.checkpointInterval = checkpointInterval;
this.restoredState = restoredState;
@@ -50,6 +54,6 @@ public class MicroBatchPartition implements
InputPartition<InternalRow> {
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new MicroBatchPartitionReader(source, parallelism, subtaskId,
checkpointId, checkpointInterval, restoredState);
+ return new MicroBatchPartitionReader(source, parallelism, subtaskId,
rowType, checkpointId, checkpointInterval, restoredState);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
index 46f40404..617282cc 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
@@ -27,6 +27,8 @@ import
org.apache.seatunnel.translation.spark.source.ReaderState;
import
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+import org.apache.spark.sql.types.StructType;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -41,8 +43,8 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
protected final Integer checkpointInterval;
protected ScheduledThreadPoolExecutor executor;
- public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, Integer checkpointId, Integer
checkpointInterval, List<byte[]> restoredState) {
- super(source, parallelism, subtaskId);
+ public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer
checkpointId, Integer checkpointInterval, List<byte[]> restoredState) {
+ super(source, parallelism, subtaskId, rowType);
this.checkpointId = checkpointId;
this.restoredState = restoredState;
this.checkpointInterval = checkpointInterval;
@@ -58,7 +60,7 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
@Override
protected Collector<SeaTunnelRow> createCollector() {
- return new InternalRowCollector(handover, checkpointLock);
+ return new InternalRowCollector(handover, checkpointLock, rowType);
}
@Override