This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d5abf8f506 [BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't
map field(#7164) (#7168)
d5abf8f506 is described below
commit d5abf8f506c1a7674572077fa685da3241564fb6
Author: Zhihong Pan <[email protected]>
AuthorDate: Tue Jul 16 22:14:29 2024 +0800
[BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't map
field(#7164) (#7168)
---
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 2 +-
.../seatunnel/maxcompute/sink/MaxcomputeWriter.java | 9 +++++++--
.../maxcompute/util/MaxcomputeTypeMapper.java | 19 ++++++++++++++-----
.../seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java | 3 ++-
4 files changed, 24 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index c5acadb173..6abce7e417 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -59,6 +59,6 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
- return new MaxcomputeWriter(this.pluginConfig);
+ return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index c6ee285a4b..51492ae591 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
@@ -46,9 +47,11 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
private static final Long BLOCK_0 = 0L;
+ private SeaTunnelRowType rowType;
- public MaxcomputeWriter(Config pluginConfig) {
+ public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) {
try {
+ this.rowType = rowType;
Table table = MaxcomputeUtil.getTable(pluginConfig);
this.tableSchema = table.getSchema();
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
@@ -76,7 +79,9 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
@Override
public void write(SeaTunnelRow seaTunnelRow) throws IOException {
- Record record =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema);
+ Record record =
+ MaxcomputeTypeMapper.getMaxcomputeRowData(
+ seaTunnelRow, this.tableSchema, this.rowType);
recordWriter.write(record);
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index fccc056274..2a3eda909a 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -67,14 +67,23 @@ public class MaxcomputeTypeMapper implements Serializable {
return new SeaTunnelRow(fields.toArray());
}
- public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow,
TableSchema tableSchema) {
+ public static Record getMaxcomputeRowData(
+ SeaTunnelRow seaTunnelRow, TableSchema tableSchema,
SeaTunnelRowType rowType) {
ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
- List<Column> columns = tableSchema.getColumns();
for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
+ String fieldName = rowType.getFieldName(i);
+ if (!tableSchema.containsColumn(fieldName)) {
+ throw new MaxcomputeConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ String.format(
+ "field not found in written table: %s,rowType:
%s",
+ fieldName, seaTunnelRow.getField(i)));
+ }
+ Column column = tableSchema.getColumn(fieldName);
+
arrayRecord.set(
- i,
- resolveObject2Maxcompute(
- seaTunnelRow.getField(i),
columns.get(i).getTypeInfo()));
+ tableSchema.getColumnIndex(fieldName),
+ resolveObject2Maxcompute(seaTunnelRow.getField(i),
column.getTypeInfo()));
}
return arrayRecord;
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
index 0eeff7c4d3..d4542af820 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
@@ -53,7 +53,8 @@ public class BasicTypeToOdpsTypeTest {
}
SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
- Record tRecord =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema);
+ Record tRecord =
+ MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow,
tableSchema, typeInfo);
for (int i = 0; i < tRecord.getColumns().length; i++) {
Assertions.assertEquals(record.get(i), tRecord.get(i));