This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 05c6015b14 [Fix] Fix build failed cause by `JdbcHiveIT` and
`SparkSinkTest` (#5798)
05c6015b14 is described below
commit 05c6015b1491da3f931e3b7cb5719d64d0039c35
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 17:44:16 2023 +0800
[Fix] Fix build failed cause by `JdbcHiveIT` and `SparkSinkTest` (#5798)
---
.../connectors/seatunnel/jdbc/JdbcHiveIT.java | 2 +-
.../spark/sink/SeaTunnelSinkWithBuffer.java | 106 ------------------
.../translation/spark/sink/SparkSinkTest.java | 118 ++++++++++++++++++++-
3 files changed, 118 insertions(+), 108 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
index 69542598d4..936ea73bbe 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
@@ -141,7 +141,7 @@ public class JdbcHiveIT extends AbstractJdbcIT {
}
@Override
- void compareResult() {}
+ void compareResult(String executeKey) {}
@Override
String driverUrl() {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
index e16a29466b..ab1ac3f00e 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
@@ -19,14 +19,7 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import java.io.IOException;
@@ -37,105 +30,6 @@ public class SeaTunnelSinkWithBuffer implements
SeaTunnelSink<SeaTunnelRow, Void
return "SeaTunnelSinkWithBuffer";
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return new SeaTunnelRowType(
- new String[] {
- "int",
- "string",
- "boolean",
- "float",
- "double",
- "byte",
- "short",
- "long",
- "decimal",
- "date",
- "timestamp",
- "null",
- "array_string",
- "array_boolean",
- "array_byte",
- "array_short",
- "array_int",
- "array_long",
- "array_float",
- "array_double",
- "map",
- "row"
- },
- new SeaTunnelDataType[] {
- BasicType.INT_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.LONG_TYPE,
- new DecimalType(10, 2),
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- BasicType.VOID_TYPE,
- ArrayType.STRING_ARRAY_TYPE,
- ArrayType.BOOLEAN_ARRAY_TYPE,
- ArrayType.BYTE_ARRAY_TYPE,
- ArrayType.SHORT_ARRAY_TYPE,
- ArrayType.INT_ARRAY_TYPE,
- ArrayType.LONG_ARRAY_TYPE,
- ArrayType.FLOAT_ARRAY_TYPE,
- ArrayType.DOUBLE_ARRAY_TYPE,
- new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
- new SeaTunnelRowType(
- new String[] {
- "int",
- "string",
- "boolean",
- "float",
- "double",
- "byte",
- "short",
- "long",
- "decimal",
- "date",
- "timestamp",
- "null",
- "array_string",
- "array_boolean",
- "array_byte",
- "array_short",
- "array_int",
- "array_long",
- "array_float",
- "array_double",
- "map"
- },
- new SeaTunnelDataType[] {
- BasicType.INT_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.LONG_TYPE,
- new DecimalType(10, 2),
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- BasicType.VOID_TYPE,
- ArrayType.STRING_ARRAY_TYPE,
- ArrayType.BOOLEAN_ARRAY_TYPE,
- ArrayType.BYTE_ARRAY_TYPE,
- ArrayType.SHORT_ARRAY_TYPE,
- ArrayType.INT_ARRAY_TYPE,
- ArrayType.LONG_ARRAY_TYPE,
- ArrayType.FLOAT_ARRAY_TYPE,
- ArrayType.DOUBLE_ARRAY_TYPE,
- new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
- })
- });
- }
-
@Override
public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
index d22789e1be..9f9292e20b 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
@@ -17,6 +17,12 @@
package org.apache.seatunnel.translation.spark.sink;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
@@ -285,11 +291,121 @@ public class SparkSinkTest {
row3
});
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "int",
+ "string",
+ "boolean",
+ "float",
+ "double",
+ "byte",
+ "short",
+ "long",
+ "decimal",
+ "date",
+ "timestamp",
+ "null",
+ "array_string",
+ "array_boolean",
+ "array_byte",
+ "array_short",
+ "array_int",
+ "array_long",
+ "array_float",
+ "array_double",
+ "map",
+ "row"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.LONG_TYPE,
+ new
org.apache.seatunnel.api.table.type.DecimalType(10, 2),
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.VOID_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.BOOLEAN_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.BYTE_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.SHORT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.INT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.LONG_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.FLOAT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE,
+ new org.apache.seatunnel.api.table.type.MapType<>(
+ BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ new SeaTunnelRowType(
+ new String[] {
+ "int",
+ "string",
+ "boolean",
+ "float",
+ "double",
+ "byte",
+ "short",
+ "long",
+ "decimal",
+ "date",
+ "timestamp",
+ "null",
+ "array_string",
+ "array_boolean",
+ "array_byte",
+ "array_short",
+ "array_int",
+ "array_long",
+ "array_float",
+ "array_double",
+ "map"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.LONG_TYPE,
+ new
org.apache.seatunnel.api.table.type.DecimalType(10, 2),
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.VOID_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .STRING_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .BOOLEAN_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .BYTE_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .SHORT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .INT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .LONG_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .FLOAT_ARRAY_TYPE,
+
org.apache.seatunnel.api.table.type.ArrayType
+ .DOUBLE_ARRAY_TYPE,
+ new
org.apache.seatunnel.api.table.type.MapType<>(
+ BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ })
+ });
+
Dataset<Row> dataset =
spark.createDataFrame(
Arrays.asList(row1WithRow, row2WithRow, row3WithRow),
structType.add("row", structType));
- SparkSinkInjector.inject(dataset.write(), new
SeaTunnelSinkWithBuffer())
+ SparkSinkInjector.inject(
+ dataset.write(),
+ new SeaTunnelSinkWithBuffer(),
+ CatalogTableUtil.getCatalogTable("test", "test",
"test", "test", rowType))
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();